• .


Spark Configuration #

Catalogs #

Spark 3.0 adds an API to plug in table catalogs that are used to load, create, and manage Iceberg tables. Spark catalogs are configured by setting Spark properties under spark.sql.catalog.

This creates an Iceberg catalog named hive_prod that loads tables from a Hive metastore:

spark.sql.catalog.hive_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type = hive
spark.sql.catalog.hive_prod.uri = thrift://metastore-host:port
# omit uri to use the same URI as Spark: hive.metastore.uris in hive-site.xml

Iceberg also supports a directory-based catalog in HDFS that can be configured using type=hadoop:

spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type = hadoop
spark.sql.catalog.hadoop_prod.warehouse = hdfs://nn:8020/warehouse/path
The Hive-based catalog only loads Iceberg tables. To load non-Iceberg tables in the same Hive metastore, use a session catalog.

Catalog configuration #

A catalog is created and named by adding a property spark.sql.catalog.(catalog-name) with an implementation class for its value.

Iceberg supplies two implementations:

  • org.apache.iceberg.spark.SparkCatalog supports a Hive Metastore or a Hadoop warehouse as a catalog
  • org.apache.iceberg.spark.SparkSessionCatalog adds support for Iceberg tables to Spark’s built-in catalog, and delegates to the built-in catalog for non-Iceberg tables

Both catalogs are configured using properties nested under the catalog name. Common configuration properties for Hive and Hadoop are:

spark.sql.catalog.catalog-name.typehive or hadoopThe underlying Iceberg catalog implementation, HiveCatalog, HadoopCatalog or left unset if using a custom catalog
spark.sql.catalog.catalog-name.catalog-implThe underlying Iceberg catalog implementation.
spark.sql.catalog.catalog-name.default-namespacedefaultThe default current namespace for the catalog
spark.sql.catalog.catalog-name.urithrift://host:portMetastore connect URI; default from hive-site.xml
spark.sql.catalog.catalog-name.warehousehdfs://nn:8020/warehouse/pathBase path for the warehouse directory
spark.sql.catalog.catalog-name.cache-enabledtrue or falseWhether to enable catalog cache, default value is true
spark.sql.catalog.catalog-name.cache.expiration-interval-ms30000 (30 seconds)Duration after which cached catalog entries are expired; Only effective if cache-enabled is true. -1 disables cache expiration and 0 disables caching entirely, irrespective of cache-enabled. Default is 30000 (30 seconds)

Additional properties can be found in common catalog configuration.

Using catalogs #

Catalog names are used in SQL queries to identify a table. In the examples above, hive_prod and hadoop_prod can be used to prefix database and table names that will be loaded from those catalogs.

SELECT * FROM hive_prod.db.table -- load db.table from catalog hive_prod

Spark 3 keeps track of the current catalog and namespace, which can be omitted from table names.

USE hive_prod.db;
SELECT * FROM table -- load db.table from catalog hive_prod

To see the current catalog and namespace, run SHOW CURRENT NAMESPACE.

Replacing the session catalog #

To add Iceberg table support to Spark’s built-in catalog, configure spark_catalog to use Iceberg’s SparkSessionCatalog.

spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type = hive

Spark’s built-in catalog supports existing v1 and v2 tables tracked in a Hive Metastore. This configures Spark to use Iceberg’s SparkSessionCatalog as a wrapper around that session catalog. When a table is not an Iceberg table, the built-in catalog will be used to load it instead.

This configuration can use same Hive Metastore for both Iceberg and non-Iceberg tables.

Using catalog specific Hadoop configuration values #

Similar to configuring Hadoop properties by using spark.hadoop.*, it’s possible to set per-catalog Hadoop configuration values when using Spark by adding the property for the catalog with the prefix spark.sql.catalog.(catalog-name).hadoop.*. These properties will take precedence over values configured globally using spark.hadoop.* and will only affect Iceberg tables.

spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.endpoint = http://aws-local:9000

Loading a custom catalog #

Spark supports loading a custom Iceberg Catalog implementation by specifying the catalog-impl property. Here is an example:

spark.sql.catalog.custom_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.custom_prod.catalog-impl = com.my.custom.CatalogImpl
spark.sql.catalog.custom_prod.my-additional-catalog-config = my-value

Catalogs in Spark 2.4 #

When using Iceberg 0.11.0 and later, Spark 2.4 can load tables from multiple Iceberg catalogs or from table locations.

Catalogs in 2.4 are configured just like catalogs in 3.0, but only Iceberg catalogs are supported.

SQL Extensions #

Iceberg 0.11.0 and later add an extension module to Spark to add new SQL commands, like CALL for stored procedures or ALTER TABLE ... WRITE ORDERED BY.

Using those SQL commands requires adding Iceberg extensions to your Spark environment using the following Spark property:

Spark extensions propertyIceberg extensions implementation

SQL extensions are not available for Spark 2.4.

Runtime configuration #

Read options #

Spark read options are passed when configuring the DataFrameReader, like this:

// time travel
    .option("snapshot-id", 10963874102873L)
Spark optionDefaultDescription
snapshot-id(latest)Snapshot ID of the table snapshot to read
as-of-timestamp(latest)A timestamp in milliseconds; the snapshot used will be the snapshot current at this time.
split-sizeAs per table propertyOverrides this table’s read.split.target-size and read.split.metadata-target-size
lookbackAs per table propertyOverrides this table’s read.split.planning-lookback
file-open-costAs per table propertyOverrides this table’s read.split.open-file-cost
vectorization-enabledAs per table propertyOverrides this table’s read.parquet.vectorization.enabled
batch-sizeAs per table propertyOverrides this table’s read.parquet.vectorization.batch-size
stream-from-timestamp(none)A timestamp in milliseconds to stream from; if before the oldest known ancestor snapshot, the oldest will be used

Write options #

Spark write options are passed when configuring the DataFrameWriter, like this:

// write with Avro instead of Parquet
    .option("write-format", "avro")
    .option("snapshot-property.key", "value")
Spark optionDefaultDescription
write-formatTable write.format.defaultFile format to use for this write operation; parquet, avro, or orc
target-file-size-bytesAs per table propertyOverrides this table’s write.target-file-size-bytes
check-nullabilitytrueSets the nullable check on fields
snapshot-property.custom-keynullAdds an entry with custom-key and corresponding value in the snapshot summary
fanout-enabledfalseOverrides this table’s write.spark.fanout.enabled
check-orderingtrueChecks if input schema and table schema are same
isolation-levelnullDesired isolation level for Dataframe overwrite operations. null => no checks (for idempotent writes), serializable => check for concurrent inserts or deletes in destination partitions, snapshot => checks for concurrent deletes in destination partitions.
validate-from-snapshot-idnullIf isolation level is set, id of base snapshot from which to check concurrent write conflicts into a table. Should be the snapshot before any reads from the table. Can be obtained via Table API or Snapshots table. If null, the table’s oldest known snapshot is used.