Skip to content

Flink Configuration🔗

Catalog Configuration🔗

A catalog is created and named by executing the following query (replace <catalog_name> with your catalog name and <config_key>=<config_value> with catalog implementation config):

CREATE CATALOG <catalog_name> WITH (
  'type'='iceberg',
  `<config_key>`=`<config_value>`
); 

The following properties can be set globally and are not limited to a specific catalog implementation:

Property Required Values Description
type ✔️ iceberg Must be iceberg.
catalog-type hive, hadoop, rest, glue, jdbc or nessie The underlying Iceberg catalog implementation, HiveCatalog, HadoopCatalog, RESTCatalog, GlueCatalog, JdbcCatalog, NessieCatalog or left unset if using a custom catalog implementation via catalog-impl
catalog-impl The fully-qualified class name of a custom catalog implementation. Must be set if catalog-type is unset.
property-version Version number to describe the property version. This property can be used for backwards compatibility in case the property format changes. The current property version is 1.
cache-enabled true or false Whether to enable catalog cache, default value is true.
cache.expiration-interval-ms How long catalog entries are locally cached, in milliseconds; negative values like -1 will disable expiration, value 0 is not allowed to set. default value is -1.

The following properties can be set if using the Hive catalog:

Property Required Values Description
uri ✔️ The Hive metastore's thrift URI.
clients The Hive metastore client pool size, default value is 2.
warehouse The Hive warehouse location, users should specify this path if neither set the hive-conf-dir to specify a location containing a hive-site.xml configuration file nor add a correct hive-site.xml to classpath.
hive-conf-dir Path to a directory containing a hive-site.xml configuration file which will be used to provide custom Hive configuration values. The value of hive.metastore.warehouse.dir from <hive-conf-dir>/hive-site.xml (or hive configure file from classpath) will be overwritten with the warehouse value if setting both hive-conf-dir and warehouse when creating iceberg catalog.
hadoop-conf-dir Path to a directory containing core-site.xml and hdfs-site.xml configuration files which will be used to provide custom Hadoop configuration values.

The following properties can be set if using the Hadoop catalog:

Property Required Values Description
warehouse ✔️ The HDFS directory to store metadata files and data files.

The following properties can be set if using the REST catalog:

Property Required Values Description
uri ✔️ The URL to the REST Catalog.
credential A credential to exchange for a token in the OAuth2 client credentials flow.
token A token which will be used to interact with the server.

Runtime configuration🔗

Read options🔗

Flink read options are passed when configuring the Flink IcebergSource:

IcebergSource.forRowData()
    .tableLoader(TableLoader.fromCatalog(...))
    .assignerFactory(new SimpleSplitAssignerFactory())
    .streaming(true)
    .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID)
    .startSnapshotId(3821550127947089987L)
    .monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", "10s") \ set(FlinkReadOptions.MONITOR_INTERVAL, "10s")
    .build()

For Flink SQL, read options can be passed in via SQL hints like this:

SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */
...

Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode.

env.getConfig()
    .getConfiguration()
    .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST_OPTION, 1000L);
...

Read option has the highest priority, followed by Flink configuration and then Table property.

Read option Flink configuration Table property Default Description
snapshot-id N/A N/A null For time travel in batch mode. Read data from the specified snapshot-id.
case-sensitive connector.iceberg.case-sensitive N/A false If true, match column name in a case sensitive way.
as-of-timestamp N/A N/A null For time travel in batch mode. Read data from the most recent snapshot as of the given time in milliseconds.
starting-strategy connector.iceberg.starting-strategy N/A INCREMENTAL_FROM_LATEST_SNAPSHOT Starting strategy for streaming execution. TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode. The incremental mode starts from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive. INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive. If the timestamp is between two snapshots, it should start from the snapshot after the timestamp. Just for FIP27 Source.
start-snapshot-timestamp N/A N/A null Start to read data from the most recent snapshot as of the given time in milliseconds.
start-snapshot-id N/A N/A null Start to read data from the specified snapshot-id.
end-snapshot-id N/A N/A The latest snapshot id Specifies the end snapshot.
branch N/A N/A main Specifies the branch to read from in batch mode
tag N/A N/A null Specifies the tag to read from in batch mode
start-tag N/A N/A null Specifies the starting tag to read from for incremental reads
end-tag N/A N/A null Specifies the ending tag to to read from for incremental reads
split-size connector.iceberg.split-size read.split.target-size 128 MB Target size when combining input splits.
split-lookback connector.iceberg.split-file-open-cost read.split.planning-lookback 10 Number of bins to consider when combining input splits.
split-file-open-cost connector.iceberg.split-file-open-cost read.split.open-file-cost 4MB The estimated cost to open a file, used as a minimum weight when combining splits.
streaming connector.iceberg.streaming N/A false Sets whether the current task runs in streaming or batch mode.
monitor-interval connector.iceberg.monitor-interval N/A 60s Monitor interval to discover splits from new snapshots. Applicable only for streaming read.
include-column-stats connector.iceberg.include-column-stats N/A false Create a new scan from this that loads the column stats with each data file. Column stats include: value count, null value count, lower bounds, and upper bounds.
max-planning-snapshot-count connector.iceberg.max-planning-snapshot-count N/A Integer.MAX_VALUE Max number of snapshots limited per split enumeration. Applicable only to streaming read.
limit connector.iceberg.limit N/A -1 Limited output number of rows.
max-allowed-planning-failures connector.iceberg.max-allowed-planning-failures N/A 3 Max allowed consecutive failures for scan planning before failing the job. Set to -1 for never failing the job for scan planing failure.
watermark-column connector.iceberg.watermark-column N/A null Specifies the watermark column to use for watermark generation. If this option is present, the splitAssignerFactory will be overridden with OrderedSplitAssignerFactory.
watermark-column-time-unit connector.iceberg.watermark-column-time-unit N/A TimeUnit.MICROSECONDS Specifies the watermark time unit to use for watermark generation. The possible values are DAYS, HOURS, MINUTES, SECONDS, MILLISECONDS, MICROSECONDS, NANOSECONDS.

Write options🔗

Flink write options are passed when configuring the FlinkSink, like this:

FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
    .table(table)
    .tableLoader(tableLoader)
    .set("write-format", "orc")
    .set(FlinkWriteOptions.OVERWRITE_MODE, "true");

For Flink SQL, write options can be passed in via SQL hints like this:

INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
...
Flink option Default Description
write-format Table write.format.default File format to use for this write operation; parquet, avro, or orc
target-file-size-bytes As per table property Overrides this table's write.target-file-size-bytes
upsert-enabled Table write.upsert.enabled Overrides this table's write.upsert.enabled
overwrite-enabled false Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream.
distribution-mode Table write.distribution-mode Overrides this table's write.distribution-mode
compression-codec Table write.(fileformat).compression-codec Overrides this table's compression codec for this write
compression-level Table write.(fileformat).compression-level Overrides this table's compression level for Parquet and Avro tables for this write
compression-strategy Table write.orc.compression-strategy Overrides this table's compression strategy for ORC tables for this write
write-parallelism Upstream operator parallelism Overrides the writer parallelism