Flink Configuration🔗
Catalog Configuration🔗
A catalog is created and named by executing the following query (replace <catalog_name>
with your catalog name and
<config_key>
=<config_value>
with catalog implementation config):
The following properties can be set globally and are not limited to a specific catalog implementation:
Property | Required | Values | Description |
---|---|---|---|
type | ✔️ | iceberg | Must be iceberg . |
catalog-type | hive , hadoop , rest , glue , jdbc or nessie |
The underlying Iceberg catalog implementation, HiveCatalog , HadoopCatalog , RESTCatalog , GlueCatalog , JdbcCatalog , NessieCatalog or left unset if using a custom catalog implementation via catalog-impl |
|
catalog-impl | The fully-qualified class name of a custom catalog implementation. Must be set if catalog-type is unset. |
||
property-version | Version number to describe the property version. This property can be used for backwards compatibility in case the property format changes. The current property version is 1 . |
||
cache-enabled | true or false |
Whether to enable catalog cache, default value is true . |
|
cache.expiration-interval-ms | How long catalog entries are locally cached, in milliseconds; negative values like -1 will disable expiration, value 0 is not allowed to set. default value is -1 . |
The following properties can be set if using the Hive catalog:
Property | Required | Values | Description |
---|---|---|---|
uri | ✔️ | The Hive metastore's thrift URI. | |
clients | The Hive metastore client pool size, default value is 2. | ||
warehouse | The Hive warehouse location, users should specify this path if neither set the hive-conf-dir to specify a location containing a hive-site.xml configuration file nor add a correct hive-site.xml to classpath. |
||
hive-conf-dir | Path to a directory containing a hive-site.xml configuration file which will be used to provide custom Hive configuration values. The value of hive.metastore.warehouse.dir from <hive-conf-dir>/hive-site.xml (or hive configure file from classpath) will be overwritten with the warehouse value if setting both hive-conf-dir and warehouse when creating iceberg catalog. |
||
hadoop-conf-dir | Path to a directory containing core-site.xml and hdfs-site.xml configuration files which will be used to provide custom Hadoop configuration values. |
The following properties can be set if using the Hadoop catalog:
Property | Required | Values | Description |
---|---|---|---|
warehouse | ✔️ | The HDFS directory to store metadata files and data files. |
The following properties can be set if using the REST catalog:
Property | Required | Values | Description |
---|---|---|---|
uri | ✔️ | The URL to the REST Catalog. | |
credential | A credential to exchange for a token in the OAuth2 client credentials flow. | ||
token | A token which will be used to interact with the server. |
Runtime configuration🔗
Read options🔗
Flink read options are passed when configuring the Flink IcebergSource:
IcebergSource.forRowData()
.tableLoader(TableLoader.fromCatalog(...))
.assignerFactory(new SimpleSplitAssignerFactory())
.streaming(true)
.streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID)
.startSnapshotId(3821550127947089987L)
.monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", "10s") \ set(FlinkReadOptions.MONITOR_INTERVAL, "10s")
.build()
For Flink SQL, read options can be passed in via SQL hints like this:
Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode.
Read option
has the highest priority, followed by Flink configuration
and then Table property
.
Read option | Flink configuration | Table property | Default | Description |
---|---|---|---|---|
snapshot-id | N/A | N/A | null | For time travel in batch mode. Read data from the specified snapshot-id. |
case-sensitive | connector.iceberg.case-sensitive | N/A | false | If true, match column name in a case sensitive way. |
as-of-timestamp | N/A | N/A | null | For time travel in batch mode. Read data from the most recent snapshot as of the given time in milliseconds. |
starting-strategy | connector.iceberg.starting-strategy | N/A | INCREMENTAL_FROM_LATEST_SNAPSHOT | Starting strategy for streaming execution. TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode. The incremental mode starts from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive. INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive. If the timestamp is between two snapshots, it should start from the snapshot after the timestamp. Just for FIP27 Source. |
start-snapshot-timestamp | N/A | N/A | null | Start to read data from the most recent snapshot as of the given time in milliseconds. |
start-snapshot-id | N/A | N/A | null | Start to read data from the specified snapshot-id. |
end-snapshot-id | N/A | N/A | The latest snapshot id | Specifies the end snapshot. |
branch | N/A | N/A | main | Specifies the branch to read from in batch mode |
tag | N/A | N/A | null | Specifies the tag to read from in batch mode |
start-tag | N/A | N/A | null | Specifies the starting tag to read from for incremental reads |
end-tag | N/A | N/A | null | Specifies the ending tag to to read from for incremental reads |
split-size | connector.iceberg.split-size | read.split.target-size | 128 MB | Target size when combining input splits. |
split-lookback | connector.iceberg.split-file-open-cost | read.split.planning-lookback | 10 | Number of bins to consider when combining input splits. |
split-file-open-cost | connector.iceberg.split-file-open-cost | read.split.open-file-cost | 4MB | The estimated cost to open a file, used as a minimum weight when combining splits. |
streaming | connector.iceberg.streaming | N/A | false | Sets whether the current task runs in streaming or batch mode. |
monitor-interval | connector.iceberg.monitor-interval | N/A | 60s | Monitor interval to discover splits from new snapshots. Applicable only for streaming read. |
include-column-stats | connector.iceberg.include-column-stats | N/A | false | Create a new scan from this that loads the column stats with each data file. Column stats include: value count, null value count, lower bounds, and upper bounds. |
max-planning-snapshot-count | connector.iceberg.max-planning-snapshot-count | N/A | Integer.MAX_VALUE | Max number of snapshots limited per split enumeration. Applicable only to streaming read. |
limit | connector.iceberg.limit | N/A | -1 | Limited output number of rows. |
max-allowed-planning-failures | connector.iceberg.max-allowed-planning-failures | N/A | 3 | Max allowed consecutive failures for scan planning before failing the job. Set to -1 for never failing the job for scan planing failure. |
watermark-column | connector.iceberg.watermark-column | N/A | null | Specifies the watermark column to use for watermark generation. If this option is present, the splitAssignerFactory will be overridden with OrderedSplitAssignerFactory . |
watermark-column-time-unit | connector.iceberg.watermark-column-time-unit | N/A | TimeUnit.MICROSECONDS | Specifies the watermark time unit to use for watermark generation. The possible values are DAYS, HOURS, MINUTES, SECONDS, MILLISECONDS, MICROSECONDS, NANOSECONDS. |
Write options🔗
Flink write options are passed when configuring the FlinkSink, like this:
FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.set("write-format", "orc")
.set(FlinkWriteOptions.OVERWRITE_MODE, "true");
For Flink SQL, write options can be passed in via SQL hints like this:
Flink option | Default | Description |
---|---|---|
write-format | Table write.format.default | File format to use for this write operation; parquet, avro, or orc |
target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes |
upsert-enabled | Table write.upsert.enabled | Overrides this table's write.upsert.enabled |
overwrite-enabled | false | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. |
distribution-mode | Table write.distribution-mode | Overrides this table's write.distribution-mode. RANGE distribution is in experimental status. |
range-distribution-statistics-type | Auto | Range distribution data statistics collection type: Map, Sketch, Auto. See details here. |
range-distribution-sort-key-base-weight | 0.0 (double) | Base weight for every sort key relative to target traffic weight per writer task. See details here. |
compression-codec | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write |
compression-level | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write |
compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write |
write-parallelism | Upstream operator parallelism | Overrides the writer parallelism |
Range distribution statistics type🔗
Config value is a enum type: Map
, Sketch
, Auto
.
- Map: collects accurate sampling count for every single key. It should be used for low cardinality scenarios (like hundreds or thousands).
- Sketch: constructs a uniform random sampling via reservoir sampling. It fits well for high cardinality scenarios (like millions), as memory footprint is kept low.
- Auto: starts with Maps statistics. But if cardinality is detected higher than a threshold (currently 10,000), statistics are automatically switched to Sketch.
Range distribution sort key base weight🔗
range-distribution-sort-key-base-weight
: 0.0
.
If sort order contains partition columns, each sort key would map to one partition and data
file. This relative weight can avoid placing too many small files for sort keys with low
traffic. It is a double value that defines the minimal weight for each sort key. 0.02
means
each key has a base weight of 2%
of the targeted traffic weight per writer task.
E.g. the sink Iceberg table is partitioned daily by event time. Assume the data stream
contains events from now up to 180 days ago. With event time, traffic weight distribution
across different days typically has a long tail pattern. Current day contains the most
traffic. The older days (long tail) contain less and less traffic. Assume writer parallelism
is 10
. The total weight across all 180 days is 10,000
. Target traffic weight per writer
task would be 1,000
. Assume the weight sum for the oldest 150 days is 1,000
. Normally,
the range partitioner would put all the oldest 150 days in one writer task. That writer task
would write to 150 small files (one per day). Keeping 150 open files can potentially consume
large amount of memory. Flushing and uploading 150 files (however small) at checkpoint time
can also be potentially slow. If this config is set to 0.02
. It means every sort key has a
base weight of 2%
of targeted weight of 1,000
for every write task. It would essentially
avoid placing more than 50
data files (one per day) on one writer task no matter how small
they are.
This is only applicable to {@link StatisticsType#Map} for low-cardinality scenario. For {@link StatisticsType#Sketch} high-cardinality sort columns, they are usually not used as partition columns. Otherwise, too many partitions and small files may be generated during write. Sketch range partitioner simply splits high-cardinality keys into ordered ranges.