Flink

Apache Iceberg supports both Apache Flink’s DataStream API and Table API. See the Multi-Engine Support#apache-flink page for the integration of Apache Flink.

Feature supportFlinkNotes
SQL create catalog✔️
SQL create database✔️
SQL create table✔️
SQL create table like✔️
SQL alter table✔️Only support altering table properties, column and partition changes are not supported
SQL drop_table✔️
SQL select✔️Support both streaming and batch mode
SQL insert into✔️ ️Support both streaming and batch mode
SQL insert overwrite✔️ ️
DataStream read✔️ ️
DataStream append✔️ ️
DataStream overwrite✔️ ️
Metadata tablesSupport Java API but does not support Flink SQL
Rewrite files action✔️ ️

To create Iceberg table in Flink, it is recommended to use Flink SQL Client as it’s easier for users to understand the concepts.

Download Flink from the Apache download page. Iceberg uses Scala 2.12 when compiling the Apache iceberg-flink-runtime jar, so it’s recommended to use Flink 1.16 bundled with Scala 2.12.

FLINK_VERSION=1.16.1
SCALA_VERSION=2.12
APACHE_FLINK_URL=https://archive.apache.org/dist/flink/
wget ${APACHE_FLINK_URL}/flink-${FLINK_VERSION}/flink-${FLINK_VERSION}-bin-scala_${SCALA_VERSION}.tgz
tar xzvf flink-${FLINK_VERSION}-bin-scala_${SCALA_VERSION}.tgz

Start a standalone Flink cluster within Hadoop environment:

# HADOOP_HOME is your hadoop root directory after unpack the binary package.
APACHE_HADOOP_URL=https://archive.apache.org/dist/hadoop/
HADOOP_VERSION=2.8.5
wget ${APACHE_HADOOP_URL}/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz
tar xzvf hadoop-${HADOOP_VERSION}.tar.gz
HADOOP_HOME=`pwd`/hadoop-${HADOOP_VERSION}

export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

# Start the flink standalone cluster
./bin/start-cluster.sh

Start the Flink SQL client. There is a separate flink-runtime module in the Iceberg project to generate a bundled jar, which could be loaded by Flink SQL client directly. To build the flink-runtime bundled jar manually, build the iceberg project, and it will generate the jar under <iceberg-root-dir>/flink-runtime/build/libs. Or download the flink-runtime jar from the Apache repository.

# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`   

./bin/sql-client.sh embedded -j <flink-runtime-directory>/iceberg-flink-runtime-1.16-1.2.0.jar shell

By default, Iceberg ships with Hadoop jars for Hadoop catalog. To use Hive catalog, load the Hive jars when opening the Flink SQL client. Fortunately, Flink has provided a bundled hive jar for the SQL client. An example on how to download the dependencies and get started:

# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

ICEBERG_VERSION=1.2.0
MAVEN_URL=https://repo1.maven.org/maven2
ICEBERG_MAVEN_URL=${MAVEN_URL}/org/apache/iceberg
ICEBERG_PACKAGE=iceberg-flink-runtime
wget ${ICEBERG_MAVEN_URL}/${ICEBERG_PACKAGE}-${FLINK_VERSION_MAJOR}/${ICEBERG_VERSION}/${ICEBERG_PACKAGE}-${FLINK_VERSION_MAJOR}-${ICEBERG_VERSION}.jar -P lib/

HIVE_VERSION=2.3.9
SCALA_VERSION=2.12
FLINK_VERSION=1.16.1
FLINK_CONNECTOR_URL=${MAVEN_URL}/org/apache/flink
FLINK_CONNECTOR_PACKAGE=flink-sql-connector-hive
wget ${FLINK_CONNECTOR_URL}/${FLINK_CONNECTOR_PACKAGE}-${HIVE_VERSION}_${SCALA_VERSION}/${FLINK_VERSION}/${FLINK_CONNECTOR_PACKAGE}-${HIVE_VERSION}_${SCALA_VERSION}-${FLINK_VERSION}.jar

./bin/sql-client.sh embedded shell

Install the Apache Flink dependency using pip:

pip install apache-flink==1.16.1

Provide a file:// path to the iceberg-flink-runtime jar, which can be obtained by building the project and looking at <iceberg-root-dir>/flink-runtime/build/libs, or downloading it from the Apache official repository. Third-party jars can be added to pyflink via:

  • env.add_jars("file:///my/jar/path/connector.jar")
  • table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar")

This is also mentioned in the official docs. The example below uses env.add_jars(..):

import os

from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
iceberg_flink_runtime_jar = os.path.join(os.getcwd(), "iceberg-flink-runtime-1.16-1.2.0.jar")

env.add_jars("file://{}".format(iceberg_flink_runtime_jar))

Next, create a StreamTableEnvironment and execute Flink SQL statements. The below example shows how to create a custom catalog via the Python Table API:

from pyflink.table import StreamTableEnvironment
table_env = StreamTableEnvironment.create(env)
table_env.execute_sql("""
CREATE CATALOG my_catalog WITH (
    'type'='iceberg', 
    'catalog-impl'='com.my.custom.CatalogImpl',
    'my-additional-catalog-config'='my-value'
)
""")

Run a query:

(table_env
    .sql_query("SELECT PULocationID, DOLocationID, passenger_count FROM my_catalog.nyc.taxis LIMIT 5")
    .execute()
    .print()) 
+----+----------------------+----------------------+--------------------------------+
| op |         PULocationID |         DOLocationID |                passenger_count |
+----+----------------------+----------------------+--------------------------------+
| +I |                  249 |                   48 |                            1.0 |
| +I |                  132 |                  233 |                            1.0 |
| +I |                  164 |                  107 |                            1.0 |
| +I |                   90 |                  229 |                            1.0 |
| +I |                  137 |                  249 |                            1.0 |
+----+----------------------+----------------------+--------------------------------+
5 rows in set

For more details, please refer to the Python Table API.

Creating catalogs and using catalogs.

Flink support to create catalogs by using Flink SQL.

Catalog Configuration

A catalog is created and named by executing the following query (replace <catalog_name> with your catalog name and <config_key>=<config_value> with catalog implementation config):

CREATE CATALOG <catalog_name> WITH (
  'type'='iceberg',
  `<config_key>`=`<config_value>`
); 

The following properties can be set globally and are not limited to a specific catalog implementation:

  • type: Must be iceberg. (required)
  • catalog-type: hive, hadoop or rest for built-in catalogs, or left unset for custom catalog implementations using catalog-impl. (Optional)
  • catalog-impl: The fully-qualified class name of a custom catalog implementation. Must be set if catalog-type is unset. (Optional)
  • property-version: Version number to describe the property version. This property can be used for backwards compatibility in case the property format changes. The current property version is 1. (Optional)
  • cache-enabled: Whether to enable catalog cache, default value is true. (Optional)
  • cache.expiration-interval-ms: How long catalog entries are locally cached, in milliseconds; negative values like -1 will disable expiration, value 0 is not allowed to set. default value is -1. (Optional)

Hive catalog

This creates an Iceberg catalog named hive_catalog that can be configured using 'catalog-type'='hive', which loads tables from Hive metastore:

CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://localhost:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://nn:8020/warehouse/path'
);

The following properties can be set if using the Hive catalog:

  • uri: The Hive metastore’s thrift URI. (Required)
  • clients: The Hive metastore client pool size, default value is 2. (Optional)
  • warehouse: The Hive warehouse location, users should specify this path if neither set the hive-conf-dir to specify a location containing a hive-site.xml configuration file nor add a correct hive-site.xml to classpath.
  • hive-conf-dir: Path to a directory containing a hive-site.xml configuration file which will be used to provide custom Hive configuration values. The value of hive.metastore.warehouse.dir from <hive-conf-dir>/hive-site.xml (or hive configure file from classpath) will be overwritten with the warehouse value if setting both hive-conf-dir and warehouse when creating iceberg catalog.
  • hadoop-conf-dir: Path to a directory containing core-site.xml and hdfs-site.xml configuration files which will be used to provide custom Hadoop configuration values.

Hadoop catalog

Iceberg also supports a directory-based catalog in HDFS that can be configured using 'catalog-type'='hadoop':

CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='hdfs://nn:8020/warehouse/path',
  'property-version'='1'
);

The following properties can be set if using the Hadoop catalog:

  • warehouse: The HDFS directory to store metadata files and data files. (Required)

Execute the sql command USE CATALOG hadoop_catalog to set the current catalog.

REST catalog

This creates an iceberg catalog named rest_catalog that can be configured using 'catalog-type'='rest', which loads tables from a REST catalog:

CREATE CATALOG rest_catalog WITH (
  'type'='iceberg',
  'catalog-type'='rest',
  'uri'='https://localhost/'
);

The following properties can be set if using the REST catalog:

  • uri: The URL to the REST Catalog (Required)
  • credential: A credential to exchange for a token in the OAuth2 client credentials flow (Optional)
  • token: A token which will be used to interact with the server (Optional)

Custom catalog

Flink also supports loading a custom Iceberg Catalog implementation by specifying the catalog-impl property:

CREATE CATALOG my_catalog WITH (
  'type'='iceberg',
  'catalog-impl'='com.my.custom.CatalogImpl',
  'my-additional-catalog-config'='my-value'
);

Create through YAML config

Catalogs can be registered in sql-client-defaults.yaml before starting the SQL client.

catalogs: 
  - name: my_catalog
    type: iceberg
    catalog-type: hadoop
    warehouse: hdfs://nn:8020/warehouse/path

Create through SQL Files

The Flink SQL Client supports the -i startup option to execute an initialization SQL file to set up environment when starting up the SQL Client.

-- define available catalogs
CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://localhost:9083',
  'warehouse'='hdfs://nn:8020/warehouse/path'
);

USE CATALOG hive_catalog;

Using -i <init.sql> option to initialize SQL Client session:

/path/to/bin/sql-client.sh -i /path/to/init.sql

DDL commands

CREATE DATABASE

By default, Iceberg will use the default database in Flink. Using the following example to create a separate database in order to avoid creating tables under the default database:

CREATE DATABASE iceberg_db;
USE iceberg_db;

CREATE TABLE

CREATE TABLE `hive_catalog`.`default`.`sample` (
    id BIGINT COMMENT 'unique id',
    data STRING
);

Table create commands support the commonly used Flink create clauses including:

  • PARTITION BY (column1, column2, ...) to configure partitioning, Flink does not yet support hidden partitioning.
  • COMMENT 'table document' to set a table description.
  • WITH ('key'='value', ...) to set table configuration which will be stored in Iceberg table properties.

Currently, it does not support computed column, primary key and watermark definition etc.

PARTITIONED BY

To create a partition table, use PARTITIONED BY:

CREATE TABLE `hive_catalog`.`default`.`sample` (
    id BIGINT COMMENT 'unique id',
    data STRING
) PARTITIONED BY (data);

Iceberg support hidden partition but Flink don’t support partitioning by a function on columns, so there is no way to support hidden partition in Flink DDL.

CREATE TABLE LIKE

To create a table with the same schema, partitioning, and table properties as another table, use CREATE TABLE LIKE.

CREATE TABLE `hive_catalog`.`default`.`sample` (
    id BIGINT COMMENT 'unique id',
    data STRING
);

CREATE TABLE  `hive_catalog`.`default`.`sample_like` LIKE `hive_catalog`.`default`.`sample`;

For more details, refer to the Flink CREATE TABLE documentation.

ALTER TABLE

Iceberg only support altering table properties:

ALTER TABLE `hive_catalog`.`default`.`sample` SET ('write.format.default'='avro')

ALTER TABLE .. RENAME TO

ALTER TABLE `hive_catalog`.`default`.`sample` RENAME TO `hive_catalog`.`default`.`new_sample`;

DROP TABLE

To delete a table, run:

DROP TABLE `hive_catalog`.`default`.`sample`;

Querying with SQL

Iceberg support both streaming and batch read in Flink. Execute the following sql command to switch execution mode from streaming to batch, and vice versa:

-- Execute the flink job in streaming mode for current session context
SET execution.runtime-mode = streaming;

-- Execute the flink job in batch mode for current session context
SET execution.runtime-mode = batch;

Submit a Flink batch job using the following sentences:

-- Execute the flink job in batch mode for current session context
SET execution.runtime-mode = batch;
SELECT * FROM sample;

Iceberg supports processing incremental data in flink streaming jobs which starts from a historical snapshot-id:

-- Submit the flink job in streaming mode for current session.
SET execution.runtime-mode = streaming;

-- Enable this switch because streaming read SQL will provide few job options in flink SQL hint options.
SET table.dynamic-table-options.enabled=true;

-- Read all the records from the iceberg current snapshot, and then read incremental data starting from that snapshot.
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;

-- Read all incremental data starting from the snapshot-id '3821550127947089987' (records from this snapshot will be excluded).
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;

There are some options that could be set in Flink SQL hint options for streaming job, see read options for details.

FLIP-27 source for SQL

Here are the SQL settings for the FLIP-27 source. All other SQL settings and options documented above are applicable to the FLIP-27 source.

-- Opt in the FLIP-27 source. Default is false.
SET table.exec.iceberg.use-flip27-source = true;

Writing with SQL

Iceberg support both INSERT INTO and INSERT OVERWRITE.

INSERT INTO

To append new data to a table with a Flink streaming job, use INSERT INTO:

INSERT INTO `hive_catalog`.`default`.`sample` VALUES (1, 'a');
INSERT INTO `hive_catalog`.`default`.`sample` SELECT id, data from other_kafka_table;

INSERT OVERWRITE

To replace data in the table with the result of a query, use INSERT OVERWRITE in batch job (flink streaming job does not support INSERT OVERWRITE). Overwrites are atomic operations for Iceberg tables.

Partitions that have rows produced by the SELECT query will be replaced, for example:

INSERT OVERWRITE sample VALUES (1, 'a');

Iceberg also support overwriting given partitions by the select values:

INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 6;

For a partitioned iceberg table, when all the partition columns are set a value in PARTITION clause, it is inserting into a static partition, otherwise if partial partition columns (prefix part of all partition columns) are set a value in PARTITION clause, it is writing the query result into a dynamic partition. For an unpartitioned iceberg table, its data will be completely overwritten by INSERT OVERWRITE.

UPSERT

Iceberg supports UPSERT based on the primary key when writing data into v2 table format. There are two ways to enable upsert.

  1. Enable the UPSERT mode as table-level property write.upsert.enabled. Here is an example SQL statement to set the table property when creating a table. It would be applied for all write paths to this table (batch or streaming) unless overwritten by write options as described later.
CREATE TABLE `hive_catalog`.`default`.`sample` (
  `id`  INT UNIQUE COMMENT 'unique id',
  `data` STRING NOT NULL,
 PRIMARY KEY(`id`) NOT ENFORCED
) with ('format-version'='2', 'write.upsert.enabled'='true');
  1. Enabling UPSERT mode using upsert-enabled in the write options provides more flexibility than a table level config. Note that you still need to use v2 table format and specify the primary key when creating the table.
INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
...
OVERWRITE and UPSERT can’t be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields.

Reading with DataStream

Iceberg support streaming or batch read in Java API now.

Batch Read

This example will read all records from iceberg table and then print to the stdout console in flink batch job:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
DataStream<RowData> batch = FlinkSource.forRowData()
     .env(env)
     .tableLoader(tableLoader)
     .streaming(false)
     .build();

// Print all records to stdout.
batch.print();

// Submit and execute this batch read job.
env.execute("Test Iceberg Batch Read");

Streaming read

This example will read incremental records which start from snapshot-id ‘3821550127947089987’ and print to stdout console in flink streaming job:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
DataStream<RowData> stream = FlinkSource.forRowData()
     .env(env)
     .tableLoader(tableLoader)
     .streaming(true)
     .startSnapshotId(3821550127947089987L)
     .build();

// Print all records to stdout.
stream.print();

// Submit and execute this streaming read job.
env.execute("Test Iceberg Streaming Read");

There are other options that can be set, please see the FlinkSource#Builder.

Reading with DataStream (FLIP-27 source)

FLIP-27 source interface was introduced in Flink 1.12. It aims to solve several shortcomings of the old SourceFunction streaming source interface. It also unifies the source interfaces for both batch and streaming executions. Most source connectors (like Kafka, file) in Flink repo have migrated to the FLIP-27 interface. Flink is planning to deprecate the old SourceFunction interface in the near future.

A FLIP-27 based Flink IcebergSource is added in iceberg-flink module. The FLIP-27 IcebergSource is currently an experimental feature.

Batch Read

This example will read all records from iceberg table and then print to the stdout console in flink batch job:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");

IcebergSource<RowData> source = IcebergSource.forRowData()
    .tableLoader(tableLoader)
    .assignerFactory(new SimpleSplitAssignerFactory())
    .build();

DataStream<RowData> batch = env.fromSource(
    source,
    WatermarkStrategy.noWatermarks(),
    "My Iceberg Source",
    TypeInformation.of(RowData.class));

// Print all records to stdout.
batch.print();

// Submit and execute this batch read job.
env.execute("Test Iceberg Batch Read");

Streaming read

This example will start the streaming read from the latest table snapshot (inclusive). Every 60s, it polls Iceberg table to discover new append-only snapshots. CDC read is not supported yet.

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");

IcebergSource source = IcebergSource.forRowData()
    .tableLoader(tableLoader)
    .assignerFactory(new SimpleSplitAssignerFactory())
    .streaming(true)
    .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
    .monitorInterval(Duration.ofSeconds(60))
    .build()

DataStream<RowData> stream = env.fromSource(
    source,
    WatermarkStrategy.noWatermarks(),
    "My Iceberg Source",
    TypeInformation.of(RowData.class));

// Print all records to stdout.
stream.print();

// Submit and execute this streaming read job.
env.execute("Test Iceberg Streaming Read");

There are other options that could be set by Java API, please see the IcebergSource#Builder.

Read as Avro GenericRecord

FLIP-27 Iceberg source provides AvroGenericRecordReaderFunction that converts Flink RowData Avro GenericRecord. You can use the convert to read from Iceberg table as Avro GenericRecord DataStream.

Please make sure flink-avro jar is included in the classpath. Also iceberg-flink-runtime shaded bundle jar can’t be used because the runtime jar shades the avro package. Please use non-shaded iceberg-flink jar instead.

TableLoader tableLoader = ...;
Table table;
try (TableLoader loader = tableLoader) {
    loader.open();
    table = loader.loadTable();
}

AvroGenericRecordReaderFunction readerFunction = AvroGenericRecordReaderFunction.fromTable(table);

IcebergSource<GenericRecord> source =
    IcebergSource.<GenericRecord>builder()
        .tableLoader(tableLoader)
        .readerFunction(readerFunction)
        .assignerFactory(new SimpleSplitAssignerFactory())
        ...
        .build();

DataStream<Row> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(),
    "Iceberg Source as Avro GenericRecord", new GenericRecordAvroTypeInfo(avroSchema));

Writing with DataStream

Iceberg support writing to iceberg table from different DataStream input.

Appending data.

Flink supports writing DataStream<RowData> and DataStream<Row> to the sink iceberg table natively.

StreamExecutionEnvironment env = ...;

DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);

FlinkSink.forRowData(input)
    .tableLoader(tableLoader)
    .append();

env.execute("Test Iceberg DataStream");

The iceberg API also allows users to write generic DataStream<T> to iceberg table, more example could be found in this unit test.

Overwrite data

Set the overwrite flag in FlinkSink builder to overwrite the data in existing iceberg tables:

StreamExecutionEnvironment env = ...;

DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);

FlinkSink.forRowData(input)
    .tableLoader(tableLoader)
    .overwrite(true)
    .append();

env.execute("Test Iceberg DataStream");

Upsert data

Set the upsert flag in FlinkSink builder to upsert the data in existing iceberg table. The table must use v2 table format and have a primary key.

StreamExecutionEnvironment env = ...;

DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);

FlinkSink.forRowData(input)
    .tableLoader(tableLoader)
    .upsert(true)
    .append();

env.execute("Test Iceberg DataStream");
OVERWRITE and UPSERT can’t be set together. In UPSERT mode, if the table is partitioned, the partition fields should be included in equality fields.

Write with Avro GenericRecord

Flink Iceberg sink provides AvroGenericRecordToRowDataMapper that converts Avro GenericRecord to Flink RowData. You can use the mapper to write Avro GenericRecord DataStream to Iceberg.

Please make sure flink-avro jar is included in the classpath. Also iceberg-flink-runtime shaded bundle jar can’t be used because the runtime jar shades the avro package. Please use non-shaded iceberg-flink jar instead.

DataStream<org.apache.avro.generic.GenericRecord> dataStream = ...;

Schema icebergSchema = table.schema();


// The Avro schema converted from Iceberg schema can't be used
// due to precision difference between how Iceberg schema (micro)
// and Flink AvroToRowDataConverters (milli) deal with time type.
// Instead, use the Avro schema defined directly.
// See AvroGenericRecordToRowDataMapper Javadoc for more details.
org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(icebergSchema, table.name());

GenericRecordAvroTypeInfo avroTypeInfo = new GenericRecordAvroTypeInfo(avroSchema);
RowType rowType = FlinkSchemaUtil.convert(icebergSchema);

FlinkSink.builderFor(
    dataStream,
    AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema),
    FlinkCompatibilityUtil.toTypeInfo(rowType))
  .table(table)
  .tableLoader(tableLoader)
  .append();

Netrics

The following Flink metrics are provided by the Flink Iceberg sink.

Parallel writer metrics are added under the sub group of IcebergStreamWriter. They should have the following key-value tags.

  • table: full table name (like iceberg.my_db.my_table)
  • subtask_index: writer subtask index starting from 0
Metric nameMetric typeDescription
lastFlushDurationMsGagueThe duration (in milli) that writer subtasks take to flush and upload the files during checkpoint.
flushedDataFilesCounterNumber of data files flushed and uploaded.
flushedDeleteFilesCounterNumber of delete files flushed and uploaded.
flushedReferencedDataFilesCounterNumber of data files referenced by the flushed delete files.
dataFilesSizeHistogramHistogramHistogram distribution of data file sizes (in bytes).
deleteFilesSizeHistogramHistogramHistogram distribution of delete file sizes (in bytes).

Committer metrics are added under the sub group of IcebergFilesCommitter. They should have the following key-value tags.

  • table: full table name (like iceberg.my_db.my_table)
Metric nameMetric typeDescription
lastCheckpointDurationMsGagueThe duration (in milli) that the committer operator checkpoints its state.
lastCommitDurationMsGagueThe duration (in milli) that the Iceberg table commit takes.
committedDataFilesCountCounterNumber of data files committed.
committedDataFilesRecordCountCounterNumber of records contained in the committed data files.
committedDataFilesByteCountCounterNumber of bytes contained in the committed data files.
committedDeleteFilesCountCounterNumber of delete files committed.
committedDeleteFilesRecordCountCounterNumber of records contained in the committed delete files.
committedDeleteFilesByteCountCounterNumber of bytes contained in the committed delete files.
elapsedSecondsSinceLastSuccessfulCommitGagueElapsed time (in seconds) since last successful Iceberg commit.

elapsedSecondsSinceLastSuccessfulCommit is an ideal alerting metric to detect failed or missing Iceberg commits.

  • Iceberg commit happened after successful Flink checkpoint in the notifyCheckpointComplete callback. It could happen that Iceberg commits failed (for whatever reason), while Flink checkpoints succeeding.
  • It could also happen that notifyCheckpointComplete wasn’t triggered (for whatever bug). As a result, there won’t be any Iceberg commits attempted.

If the checkpoint interval (and expected Iceberg commit interval) is 5 minutes, set up alert with rule like elapsedSecondsSinceLastSuccessfulCommit > 60 minutes to detect failed or missing Iceberg commits in the past hour.

Options

Read options

Flink read options are passed when configuring the Flink IcebergSource:

IcebergSource.forRowData()
    .tableLoader(TableLoader.fromCatalog(...))
    .assignerFactory(new SimpleSplitAssignerFactory())
    .streaming(true)
    .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
    .startSnapshotId(3821550127947089987L)
    .monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", "10s") \ set(FlinkReadOptions.MONITOR_INTERVAL, "10s")
    .build()

For Flink SQL, read options can be passed in via SQL hints like this:

SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */
...

Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode.

env.getConfig()
    .getConfiguration()
    .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST_OPTION, 1000L);
...

Read option has the highest priority, followed by Flink configuration and then Table property.

Read optionFlink configurationTable propertyDefaultDescription
snapshot-idN/AN/AnullFor time travel in batch mode. Read data from the specified snapshot-id.
case-sensitiveconnector.iceberg.case-sensitiveN/AfalseIf true, match column name in a case sensitive way.
as-of-timestampN/AN/AnullFor time travel in batch mode. Read data from the most recent snapshot as of the given time in milliseconds.
starting-strategyconnector.iceberg.starting-strategyN/AINCREMENTAL_FROM_LATEST_SNAPSHOTStarting strategy for streaming execution. TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode. The incremental mode starts from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive. INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive. If the timestamp is between two snapshots, it should start from the snapshot after the timestamp. Just for FIP27 Source.
start-snapshot-timestampN/AN/AnullStart to read data from the most recent snapshot as of the given time in milliseconds.
start-snapshot-idN/AN/AnullStart to read data from the specified snapshot-id.
end-snapshot-idN/AN/AThe latest snapshot idSpecifies the end snapshot.
split-sizeconnector.iceberg.split-sizeread.split.target-size128 MBTarget size when combining input splits.
split-lookbackconnector.iceberg.split-file-open-costread.split.planning-lookback10Number of bins to consider when combining input splits.
split-file-open-costconnector.iceberg.split-file-open-costread.split.open-file-cost4MBThe estimated cost to open a file, used as a minimum weight when combining splits.
streamingconnector.iceberg.streamingN/AfalseSets whether the current task runs in streaming or batch mode.
monitor-intervalconnector.iceberg.monitor-intervalN/A60sMonitor interval to discover splits from new snapshots. Applicable only for streaming read.
include-column-statsconnector.iceberg.include-column-statsN/AfalseCreate a new scan from this that loads the column stats with each data file. Column stats include: value count, null value count, lower bounds, and upper bounds.
max-planning-snapshot-countconnector.iceberg.max-planning-snapshot-countN/AInteger.MAX_VALUEMax number of snapshots limited per split enumeration. Applicable only to streaming read.
limitconnector.iceberg.limitN/A-1Limited output number of rows.

Write options

Flink write options are passed when configuring the FlinkSink, like this:

FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
    .table(table)
    .tableLoader(tableLoader)
    .set("write-format", "orc")
    .set(FlinkWriteOptions.OVERWRITE_MODE, "true");

For Flink SQL, write options can be passed in via SQL hints like this:

INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
...
Flink optionDefaultDescription
write-formatTable write.format.defaultFile format to use for this write operation; parquet, avro, or orc
target-file-size-bytesAs per table propertyOverrides this table’s write.target-file-size-bytes
upsert-enabledTable write.upsert.enabledOverrides this table’s write.upsert.enabled
overwrite-enabledfalseOverwrite the table’s data, overwrite mode shouldn’t be enable when configuring to use UPSERT data stream.
distribution-modeTable write.distribution-modeOverrides this table’s write.distribution-mode
compression-codecTable write.(fileformat).compression-codecOverrides this table’s compression codec for this write
compression-levelTable write.(fileformat).compression-levelOverrides this table’s compression level for Parquet and Avro tables for this write
compression-strategyTable write.orc.compression-strategyOverrides this table’s compression strategy for ORC tables for this write

Inspecting tables

To inspect a table’s history, snapshots, and other metadata, Iceberg supports metadata tables.

Metadata tables are identified by adding the metadata table name after the original table name. For example, history for db.table is read using db.table$history.

History

To show table history:

SELECT * FROM prod.db.table$history;
made_current_atsnapshot_idparent_idis_current_ancestor
2019-02-08 03:29:51.2155781947118336215154NULLtrue
2019-02-08 03:47:55.94851792995261850568305781947118336215154true
2019-02-09 16:24:30.132964100402475335445179299526185056830false
2019-02-09 16:32:47.33629998756080624373305179299526185056830true
2019-02-09 19:42:03.91989245587860605834792999875608062437330true
2019-02-09 19:49:16.34365367338231819750458924558786060583479true
This shows a commit that was rolled back. In this example, snapshot 296410040247533544 and 2999875608062437330 have the same parent snapshot 5179299526185056830. Snapshot 296410040247533544 was rolled back and is not an ancestor of the current table state.

Metadata Log Entries

To show table metadata log entries:

SELECT * from prod.db.table$metadata_log_entries;
timestampfilelatest_snapshot_idlatest_schema_idlatest_sequence_number
2022-07-28 10:43:52.93s3://…/table/metadata/00000-9441e604-b3c2-498a-a45a-6320e8ab9006.metadata.jsonnullnullnull
2022-07-28 10:43:57.487s3://…/table/metadata/00001-f30823df-b745-4a0a-b293-7532e0c99986.metadata.json17026083367764530001
2022-07-28 10:43:58.25s3://…/table/metadata/00002-2cc2837a-02dc-4687-acc1-b4d86ea486f4.metadata.json95890649397670977402

Snapshots

To show the valid snapshots for a table:

SELECT * FROM prod.db.table$snapshots;
committed_atsnapshot_idparent_idoperationmanifest_listsummary
2019-02-08 03:29:51.21557897183625154nullappends3://…/table/metadata/snap-57897183625154-1.avro{ added-records -> 2478404, total-records -> 2478404, added-data-files -> 438, total-data-files -> 438, flink.job-id -> 2e274eecb503d85369fb390e8956c813 }

You can also join snapshots to table history. For example, this query will show table history, with the application ID that wrote each snapshot:

select
    h.made_current_at,
    s.operation,
    h.snapshot_id,
    h.is_current_ancestor,
    s.summary['flink.job-id']
from prod.db.table$history h
join prod.db.table$snapshots s
  on h.snapshot_id = s.snapshot_id
order by made_current_at
made_current_atoperationsnapshot_idis_current_ancestorsummary[flink.job-id]
2019-02-08 03:29:51.215append57897183625154true2e274eecb503d85369fb390e8956c813

Files

To show a table’s current data files:

SELECT * FROM prod.db.table$files;
contentfile_pathfile_formatspec_idpartitionrecord_countfile_size_in_bytescolumn_sizesvalue_countsnull_value_countsnan_value_countslower_boundsupper_boundskey_metadatasplit_offsetsequality_idssort_order_id
0s3:/…/table/data/00000-3-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquetPARQUET0{1999-01-01, 01}1597[1 -> 90, 2 -> 62][1 -> 1, 2 -> 1][1 -> 0, 2 -> 0][][1 -> , 2 -> c][1 -> , 2 -> c]null[4]nullnull
0s3:/…/table/data/00001-4-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquetPARQUET0{1999-01-01, 02}1597[1 -> 90, 2 -> 62][1 -> 1, 2 -> 1][1 -> 0, 2 -> 0][][1 -> , 2 -> b][1 -> , 2 -> b]null[4]nullnull
0s3:/…/table/data/00002-5-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquetPARQUET0{1999-01-01, 03}1597[1 -> 90, 2 -> 62][1 -> 1, 2 -> 1][1 -> 0, 2 -> 0][][1 -> , 2 -> a][1 -> , 2 -> a]null[4]nullnull

Manifests

To show a table’s current file manifests:

SELECT * FROM prod.db.table$manifests;
pathlengthpartition_spec_idadded_snapshot_idadded_data_files_countexisting_data_files_countdeleted_data_files_countpartition_summaries
s3://…/table/metadata/45b5290b-ee61-4788-b324-b1e2735c0e10-m0.avro447906668963634911763636800[[false,null,2019-05-13,2019-05-15]]

Note:

  1. Fields within partition_summaries column of the manifests table correspond to field_summary structs within manifest list, with the following order:
    • contains_null
    • contains_nan
    • lower_bound
    • upper_bound
  2. contains_nan could return null, which indicates that this information is not available from the file’s metadata. This usually occurs when reading from V1 table, where contains_nan is not populated.

Partitions

To show a table’s current partitions:

SELECT * FROM prod.db.table$partitions;
partitionrecord_countfile_countspec_id
{20211001, 11}110
{20211002, 11}110
{20211001, 10}110
{20211002, 10}110

Note: For unpartitioned tables, the partitions table will contain only the record_count and file_count columns.

All Metadata Tables

These tables are unions of the metadata tables specific to the current snapshot, and return metadata across all snapshots.

The “all” metadata tables may produce more than one row per data file or manifest file because metadata files may be part of more than one table snapshot.

All Data Files

To show all of the table’s data files and each file’s metadata:

SELECT * FROM prod.db.table$all_data_files;
contentfile_pathfile_formatpartitionrecord_countfile_size_in_bytescolumn_sizesvalue_countsnull_value_countsnan_value_countslower_boundsupper_boundskey_metadatasplit_offsetsequality_idssort_order_id
0s3://…/dt=20210102/00000-0-756e2512-49ae-45bb-aae3-c0ca475e7879-00001.parquetPARQUET{20210102}142444{1 -> 94, 2 -> 17}{1 -> 14, 2 -> 14}{1 -> 0, 2 -> 0}{}{1 -> 1, 2 -> 20210102}{1 -> 2, 2 -> 20210102}null[4]null0
0s3://…/dt=20210103/00000-0-26222098-032f-472b-8ea5-651a55b21210-00001.parquetPARQUET{20210103}142444{1 -> 94, 2 -> 17}{1 -> 14, 2 -> 14}{1 -> 0, 2 -> 0}{}{1 -> 1, 2 -> 20210103}{1 -> 3, 2 -> 20210103}null[4]null0
0s3://…/dt=20210104/00000-0-a3bb1927-88eb-4f1c-bc6e-19076b0d952e-00001.parquetPARQUET{20210104}142444{1 -> 94, 2 -> 17}{1 -> 14, 2 -> 14}{1 -> 0, 2 -> 0}{}{1 -> 1, 2 -> 20210104}{1 -> 3, 2 -> 20210104}null[4]null0

All Manifests

To show all of the table’s manifest files:

SELECT * FROM prod.db.table$all_manifests;
pathlengthpartition_spec_idadded_snapshot_idadded_data_files_countexisting_data_files_countdeleted_data_files_countpartition_summaries
s3://…/metadata/a85f78c5-3222-4b37-b7e4-faf944425d48-m0.avro637606272782676904868561200[{false, false, 20210101, 20210101}]

Note:

  1. Fields within partition_summaries column of the manifests table correspond to field_summary structs within manifest list, with the following order:
    • contains_null
    • contains_nan
    • lower_bound
    • upper_bound
  2. contains_nan could return null, which indicates that this information is not available from the file’s metadata. This usually occurs when reading from V1 table, where contains_nan is not populated.

References

To show a table’s known snapshot references:

SELECT * FROM prod.db.table$refs;
nametypesnapshot_idmax_reference_age_in_msmin_snapshots_to_keepmax_snapshot_age_in_ms
mainBRANCH4686954189838128572102030
testTagTAG468695418983812857210nullnull

Rewrite files action.

Iceberg provides API to rewrite small files into large files by submitting flink batch job. The behavior of this flink action is the same as the spark’s rewriteDataFiles.

import org.apache.iceberg.flink.actions.Actions;

TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
Table table = tableLoader.loadTable();
RewriteDataFilesActionResult result = Actions.forTable(table)
        .rewriteDataFiles()
        .execute();

For more doc about options of the rewrite files action, please see RewriteDataFilesAction

Type conversion

Iceberg’s integration for Flink automatically converts between Flink and Iceberg types. When writing to a table with types that are not supported by Flink, like UUID, Iceberg will accept and convert values from the Flink type.

Flink types are converted to Iceberg types according to the following table:

FlinkIcebergNotes
booleanboolean
tinyintinteger
smallintinteger
integerinteger
bigintlong
floatfloat
doubledouble
charstring
varcharstring
stringstring
binarybinary
varbinaryfixed
decimaldecimal
datedate
timetime
timestamptimestamp without timezone
timestamp_ltztimestamp with timezone
arraylist
mapmap
multisetmap
rowstruct
rawNot supported
intervalNot supported
structuredNot supported
timestamp with zoneNot supported
distinctNot supported
nullNot supported
symbolNot supported
logicalNot supported

Iceberg types are converted to Flink types according to the following table:

IcebergFlink
booleanboolean
structrow
listarray
mapmap
integerinteger
longbigint
floatfloat
doubledouble
datedate
timetime
timestamp without timezonetimestamp(6)
timestamp with timezonetimestamp_ltz(6)
stringvarchar(2147483647)
uuidbinary(16)
fixed(N)binary(N)
binaryvarbinary(2147483647)
decimal(P, S)decimal(P, S)

Future improvement.

There are some features that are do not yet supported in the current Flink Iceberg integration work:

  • Don’t support creating iceberg table with hidden partitioning. Discussion in flink mail list.
  • Don’t support creating iceberg table with computed column.
  • Don’t support creating iceberg table with watermark.
  • Don’t support adding columns, removing columns, renaming columns, changing columns. FLINK-19062 is tracking this.