Flink Configuration🔗
Catalog Configuration🔗
A catalog is created and named by executing the following query (replace <catalog_name>
with your catalog name and
<config_key>
=<config_value>
with catalog implementation config):
The following properties can be set globally and are not limited to a specific catalog implementation:
Property | Required | Values | Description |
---|---|---|---|
type | ✔️ | iceberg | Must be iceberg . |
catalog-type | hive , hadoop , rest , glue , jdbc or nessie |
The underlying Iceberg catalog implementation, HiveCatalog , HadoopCatalog , RESTCatalog , GlueCatalog , JdbcCatalog , NessieCatalog or left unset if using a custom catalog implementation via catalog-impl |
|
catalog-impl | The fully-qualified class name of a custom catalog implementation. Must be set if catalog-type is unset. |
||
property-version | Version number to describe the property version. This property can be used for backwards compatibility in case the property format changes. The current property version is 1 . |
||
cache-enabled | true or false |
Whether to enable catalog cache, default value is true . |
|
cache.expiration-interval-ms | How long catalog entries are locally cached, in milliseconds; negative values like -1 will disable expiration, value 0 is not allowed to set. default value is -1 . |
The following properties can be set if using the Hive catalog:
Property | Required | Values | Description |
---|---|---|---|
uri | ✔️ | The Hive metastore's thrift URI. | |
clients | The Hive metastore client pool size, default value is 2. | ||
warehouse | The Hive warehouse location, users should specify this path if neither set the hive-conf-dir to specify a location containing a hive-site.xml configuration file nor add a correct hive-site.xml to classpath. |
||
hive-conf-dir | Path to a directory containing a hive-site.xml configuration file which will be used to provide custom Hive configuration values. The value of hive.metastore.warehouse.dir from <hive-conf-dir>/hive-site.xml (or hive configure file from classpath) will be overwritten with the warehouse value if setting both hive-conf-dir and warehouse when creating iceberg catalog. |
||
hadoop-conf-dir | Path to a directory containing core-site.xml and hdfs-site.xml configuration files which will be used to provide custom Hadoop configuration values. |
The following properties can be set if using the Hadoop catalog:
Property | Required | Values | Description |
---|---|---|---|
warehouse | ✔️ | The HDFS directory to store metadata files and data files. |
The following properties can be set if using the REST catalog:
Property | Required | Values | Description |
---|---|---|---|
uri | ✔️ | The URL to the REST Catalog. | |
credential | A credential to exchange for a token in the OAuth2 client credentials flow. | ||
token | A token which will be used to interact with the server. |
Runtime configuration🔗
Read options🔗
Flink read options are passed when configuring the Flink IcebergSource:
IcebergSource.forRowData()
.tableLoader(TableLoader.fromCatalog(...))
.assignerFactory(new SimpleSplitAssignerFactory())
.streaming(true)
.streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID)
.startSnapshotId(3821550127947089987L)
.monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", "10s") \ set(FlinkReadOptions.MONITOR_INTERVAL, "10s")
.build()
For Flink SQL, read options can be passed in via SQL hints like this:
Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode.
Read option
has the highest priority, followed by Flink configuration
and then Table property
.
Read option | Flink configuration | Table property | Default | Description |
---|---|---|---|---|
snapshot-id | N/A | N/A | null | For time travel in batch mode. Read data from the specified snapshot-id. |
case-sensitive | connector.iceberg.case-sensitive | N/A | false | If true, match column name in a case sensitive way. |
as-of-timestamp | N/A | N/A | null | For time travel in batch mode. Read data from the most recent snapshot as of the given time in milliseconds. |
starting-strategy | connector.iceberg.starting-strategy | N/A | INCREMENTAL_FROM_LATEST_SNAPSHOT | Starting strategy for streaming execution. TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode. The incremental mode starts from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive. INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive. If the timestamp is between two snapshots, it should start from the snapshot after the timestamp. Just for FIP27 Source. |
start-snapshot-timestamp | N/A | N/A | null | Start to read data from the most recent snapshot as of the given time in milliseconds. |
start-snapshot-id | N/A | N/A | null | Start to read data from the specified snapshot-id. |
end-snapshot-id | N/A | N/A | The latest snapshot id | Specifies the end snapshot. |
branch | N/A | N/A | main | Specifies the branch to read from in batch mode |
tag | N/A | N/A | null | Specifies the tag to read from in batch mode |
start-tag | N/A | N/A | null | Specifies the starting tag to read from for incremental reads |
end-tag | N/A | N/A | null | Specifies the ending tag to to read from for incremental reads |
split-size | connector.iceberg.split-size | read.split.target-size | 128 MB | Target size when combining input splits. |
split-lookback | connector.iceberg.split-file-open-cost | read.split.planning-lookback | 10 | Number of bins to consider when combining input splits. |
split-file-open-cost | connector.iceberg.split-file-open-cost | read.split.open-file-cost | 4MB | The estimated cost to open a file, used as a minimum weight when combining splits. |
streaming | connector.iceberg.streaming | N/A | false | Sets whether the current task runs in streaming or batch mode. |
monitor-interval | connector.iceberg.monitor-interval | N/A | 60s | Monitor interval to discover splits from new snapshots. Applicable only for streaming read. |
include-column-stats | connector.iceberg.include-column-stats | N/A | false | Create a new scan from this that loads the column stats with each data file. Column stats include: value count, null value count, lower bounds, and upper bounds. |
max-planning-snapshot-count | connector.iceberg.max-planning-snapshot-count | N/A | Integer.MAX_VALUE | Max number of snapshots limited per split enumeration. Applicable only to streaming read. |
limit | connector.iceberg.limit | N/A | -1 | Limited output number of rows. |
max-allowed-planning-failures | connector.iceberg.max-allowed-planning-failures | N/A | 3 | Max allowed consecutive failures for scan planning before failing the job. Set to -1 for never failing the job for scan planing failure. |
watermark-column | connector.iceberg.watermark-column | N/A | null | Specifies the watermark column to use for watermark generation. If this option is present, the splitAssignerFactory will be overridden with OrderedSplitAssignerFactory . |
watermark-column-time-unit | connector.iceberg.watermark-column-time-unit | N/A | TimeUnit.MICROSECONDS | Specifies the watermark time unit to use for watermark generation. The possible values are DAYS, HOURS, MINUTES, SECONDS, MILLISECONDS, MICROSECONDS, NANOSECONDS. |
Write options🔗
Flink write options are passed when configuring the FlinkSink, like this:
FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.set("write-format", "orc")
.set(FlinkWriteOptions.OVERWRITE_MODE, "true");
For Flink SQL, write options can be passed in via SQL hints like this:
Flink option | Default | Description |
---|---|---|
write-format | Table write.format.default | File format to use for this write operation; parquet, avro, or orc |
target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes |
upsert-enabled | Table write.upsert.enabled | Overrides this table's write.upsert.enabled |
overwrite-enabled | false | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. |
distribution-mode | Table write.distribution-mode | Overrides this table's write.distribution-mode |
compression-codec | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write |
compression-level | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write |
compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write |
write-parallelism | Upstream operator parallelism | Overrides the writer parallelism |