Skip to content

Flink Queries🔗

Iceberg support streaming and batch read With Apache Flink's DataStream API and Table API.

Reading with SQL🔗

Iceberg support both streaming and batch read in Flink. Execute the following sql command to switch execution mode from streaming to batch, and vice versa:

-- Execute the flink job in streaming mode for current session context
SET execution.runtime-mode = streaming;

-- Execute the flink job in batch mode for current session context
SET execution.runtime-mode = batch;

Submit a Flink batch job using the following sentences:

-- Execute the flink job in batch mode for current session context
SET execution.runtime-mode = batch;
SELECT * FROM sample;

Iceberg supports processing incremental data in Flink streaming jobs which starts from a historical snapshot-id:

-- Submit the flink job in streaming mode for current session.
SET execution.runtime-mode = streaming;

-- Enable this switch because streaming read SQL will provide few job options in flink SQL hint options.
SET table.dynamic-table-options.enabled=true;

-- Read all the records from the iceberg current snapshot, and then read incremental data starting from that snapshot.
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;

-- Read all incremental data starting from the snapshot-id '3821550127947089987' (records from this snapshot will be excluded).
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;

There are some options that could be set in Flink SQL hint options for streaming job, see read options for details.

FLIP-27 source for SQL🔗

Here is the SQL setting to opt in or out of the FLIP-27 source.

-- Opt out the FLIP-27 source.
-- Default is false for Flink 1.19 and below, and true for Flink 1.20 and above.
SET table.exec.iceberg.use-flip27-source = false;

All other SQL settings and options documented above are applicable to the FLIP-27 source.

Reading branches and tags with SQL🔗

Branch and tags can be read via SQL by specifying options. For more details refer to Flink Configuration

--- Read from branch b1
SELECT * FROM table /*+ OPTIONS('branch'='b1') */ ;

--- Read from tag t1
SELECT * FROM table /*+ OPTIONS('tag'='t1') */;

--- Incremental scan from tag t1 to tag t2
SELECT * FROM table /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-tag'='t1', 'end-tag'='t2') */;

Reading with DataStream🔗

Iceberg support streaming or batch read in Java API now.

Batch Read🔗

This example will read all records from iceberg table and then print to the stdout console in flink batch job:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
DataStream<RowData> batch = FlinkSource.forRowData()
     .env(env)
     .tableLoader(tableLoader)
     .streaming(false)
     .build();

// Print all records to stdout.
batch.print();

// Submit and execute this batch read job.
env.execute("Test Iceberg Batch Read");

Streaming read🔗

This example will read incremental records which start from snapshot-id '3821550127947089987' and print to stdout console in flink streaming job:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
DataStream<RowData> stream = FlinkSource.forRowData()
     .env(env)
     .tableLoader(tableLoader)
     .streaming(true)
     .startSnapshotId(3821550127947089987L)
     .build();

// Print all records to stdout.
stream.print();

// Submit and execute this streaming read job.
env.execute("Test Iceberg Streaming Read");

There are other options that can be set, please see the FlinkSource#Builder.

Reading with DataStream (FLIP-27 source)🔗

FLIP-27 source interface was introduced in Flink 1.12. It aims to solve several shortcomings of the old SourceFunction streaming source interface. It also unifies the source interfaces for both batch and streaming executions. Most source connectors (like Kafka, file) in Flink repo have migrated to the FLIP-27 interface. Flink is planning to deprecate the old SourceFunction interface in the near future.

A FLIP-27 based Flink IcebergSource is added in iceberg-flink module. The FLIP-27 IcebergSource is currently an experimental feature.

Batch Read🔗

This example will read all records from iceberg table and then print to the stdout console in flink batch job:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");

IcebergSource<RowData> source = IcebergSource.forRowData()
    .tableLoader(tableLoader)
    .assignerFactory(new SimpleSplitAssignerFactory())
    .build();

DataStream<RowData> batch = env.fromSource(
    source,
    WatermarkStrategy.noWatermarks(),
    "My Iceberg Source",
    TypeInformation.of(RowData.class));

// Print all records to stdout.
batch.print();

// Submit and execute this batch read job.
env.execute("Test Iceberg Batch Read");

Streaming read🔗

This example will start the streaming read from the latest table snapshot (inclusive). Every 60s, it polls Iceberg table to discover new append-only snapshots. CDC read is not supported yet.

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");

IcebergSource source = IcebergSource.forRowData()
    .tableLoader(tableLoader)
    .assignerFactory(new SimpleSplitAssignerFactory())
    .streaming(true)
    .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
    .monitorInterval(Duration.ofSeconds(60))
    .build();

DataStream<RowData> stream = env.fromSource(
    source,
    WatermarkStrategy.noWatermarks(),
    "My Iceberg Source",
    TypeInformation.of(RowData.class));

// Print all records to stdout.
stream.print();

// Submit and execute this streaming read job.
env.execute("Test Iceberg Streaming Read");

There are other options that could be set by Java API, please see the IcebergSource#Builder.

Reading branches and tags with DataStream🔗

Branches and tags can also be read via the DataStream API

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
// Read from branch
DataStream<RowData> batch = FlinkSource.forRowData()
    .env(env)
    .tableLoader(tableLoader)
    .branch("test-branch")
    .streaming(false)
    .build();

// Read from tag
DataStream<RowData> batch = FlinkSource.forRowData()
    .env(env)
    .tableLoader(tableLoader)
    .tag("test-tag")
    .streaming(false)
    .build();

// Streaming read from start-tag
DataStream<RowData> batch = FlinkSource.forRowData()
    .env(env)
    .tableLoader(tableLoader)
    .streaming(true)
    .startTag("test-tag")
    .build();

Read as Avro GenericRecord🔗

FLIP-27 Iceberg source provides AvroGenericRecordReaderFunction that converts Flink RowData Avro GenericRecord. You can use the convert to read from Iceberg table as Avro GenericRecord DataStream.

Please make sure flink-avro jar is included in the classpath. Also iceberg-flink-runtime shaded bundle jar can't be used because the runtime jar shades the avro package. Please use non-shaded iceberg-flink jar instead.

TableLoader tableLoader = ...;
Table table;
try (TableLoader loader = tableLoader) {
    loader.open();
    table = loader.loadTable();
}

AvroGenericRecordReaderFunction readerFunction = AvroGenericRecordReaderFunction.fromTable(table);

IcebergSource<GenericRecord> source =
    IcebergSource.<GenericRecord>builder()
        .tableLoader(tableLoader)
        .readerFunction(readerFunction)
        .assignerFactory(new SimpleSplitAssignerFactory())
        ...
        .build();

DataStream<Row> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(),
    "Iceberg Source as Avro GenericRecord", new GenericRecordAvroTypeInfo(avroSchema));

Emitting watermarks🔗

Emitting watermarks from the source itself could be beneficial for several purposes, like harnessing the Flink Watermark Alignment, or prevent triggering windows too early when reading multiple data files concurrently.

Enable watermark generation for an IcebergSource by setting the watermarkColumn. The supported column types are timestamp, timestamptz and long. Iceberg timestamp or timestamptz inherently contains the time precision. So there is no need to specify the time unit. But long type column doesn't contain time unit information. Use
watermarkTimeUnit to configure the conversion for long columns.

The watermarks are generated based on column metrics stored for data files and emitted once per split. If multiple smaller files with different time ranges are combined into a single split, it can increase the out-of-orderliness and extra data buffering in the Flink state. The main purpose of watermark alignment is to reduce out-of-orderliness and excess data buffering in the Flink state. Hence it is recommended to set read.split.open-file-cost to a very large value to prevent combining multiple smaller files into a single split. The negative impact (of not combining small files into a single split) is on read throughput, especially if there are many small files. In typical stateful processing jobs, source read throughput is not the bottleneck. Hence this is probably a reasonable tradeoff.

This feature requires column-level min-max stats. Make sure stats are generated for the watermark column during write phase. By default, the column metrics are collected for the first 100 columns of the table. If watermark column doesn't have stats enabled by default, use write properties starting with write.metadata.metrics when needed.

The following example could be useful if watermarks are used for windowing. The source reads Iceberg data files in order, using a timestamp column and emits watermarks:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");

DataStream<RowData> stream =
    env.fromSource(
        IcebergSource.forRowData()
            .tableLoader(tableLoader)
            // Watermark using timestamp column
            .watermarkColumn("timestamp_column")
            .build(),
        // Watermarks are generated by the source, no need to generate it manually
        WatermarkStrategy.<RowData>noWatermarks()
            // Extract event timestamp from records
            .withTimestampAssigner((record, eventTime) -> record.getTimestamp(pos, precision).getMillisecond()),
        SOURCE_NAME,
        TypeInformation.of(RowData.class));

Example for reading Iceberg table using a long event column for watermark alignment:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");

DataStream<RowData> stream =
    env.fromSource(
        IcebergSource source = IcebergSource.forRowData()
            .tableLoader(tableLoader)
            // Disable combining multiple files to a single split 
            .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST, String.valueOf(TableProperties.SPLIT_SIZE_DEFAULT))
            // Watermark using long column
            .watermarkColumn("long_column")
            .watermarkTimeUnit(TimeUnit.MILLI_SCALE)
            .build(),
        // Watermarks are generated by the source, no need to generate it manually
        WatermarkStrategy.<RowData>noWatermarks()
            .withWatermarkAlignment(watermarkGroup, maxAllowedWatermarkDrift),
        SOURCE_NAME,
        TypeInformation.of(RowData.class));

Options🔗

Read options🔗

Flink read options are passed when configuring the Flink IcebergSource:

IcebergSource.forRowData()
    .tableLoader(TableLoader.fromCatalog(...))
    .assignerFactory(new SimpleSplitAssignerFactory())
    .streaming(true)
    .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
    .startSnapshotId(3821550127947089987L)
    .monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", "10s") \ set(FlinkReadOptions.MONITOR_INTERVAL, "10s")
    .build()

For Flink SQL, read options can be passed in via SQL hints like this:

SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */
...

Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode.

env.getConfig()
    .getConfiguration()
    .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST_OPTION, 1000L);
...

Check out all the options here: read-options

Inspecting tables🔗

To inspect a table's history, snapshots, and other metadata, Iceberg supports metadata tables.

Metadata tables are identified by adding the metadata table name after the original table name. For example, history for db.table is read using db.table$history.

History🔗

To show table history:

SELECT * FROM prod.db.table$history;
made_current_at snapshot_id parent_id is_current_ancestor
2019-02-08 03:29:51.215 5781947118336215154 NULL true
2019-02-08 03:47:55.948 5179299526185056830 5781947118336215154 true
2019-02-09 16:24:30.13 296410040247533544 5179299526185056830 false
2019-02-09 16:32:47.336 2999875608062437330 5179299526185056830 true
2019-02-09 19:42:03.919 8924558786060583479 2999875608062437330 true
2019-02-09 19:49:16.343 6536733823181975045 8924558786060583479 true

Info

This shows a commit that was rolled back. In this example, snapshot 296410040247533544 and 2999875608062437330 have the same parent snapshot 5179299526185056830. Snapshot 296410040247533544 was rolled back and is not an ancestor of the current table state.

Metadata Log Entries🔗

To show table metadata log entries:

SELECT * from prod.db.table$metadata_log_entries;
timestamp file latest_snapshot_id latest_schema_id latest_sequence_number
2022-07-28 10:43:52.93 s3://.../table/metadata/00000-9441e604-b3c2-498a-a45a-6320e8ab9006.metadata.json null null null
2022-07-28 10:43:57.487 s3://.../table/metadata/00001-f30823df-b745-4a0a-b293-7532e0c99986.metadata.json 170260833677645300 0 1
2022-07-28 10:43:58.25 s3://.../table/metadata/00002-2cc2837a-02dc-4687-acc1-b4d86ea486f4.metadata.json 958906493976709774 0 2

Snapshots🔗

To show the valid snapshots for a table:

SELECT * FROM prod.db.table$snapshots;
committed_at snapshot_id parent_id operation manifest_list summary
2019-02-08 03:29:51.215 57897183625154 null append s3://.../table/metadata/snap-57897183625154-1.avro { added-records -> 2478404, total-records -> 2478404, added-data-files -> 438, total-data-files -> 438, flink.job-id -> 2e274eecb503d85369fb390e8956c813 }

You can also join snapshots to table history. For example, this query will show table history, with the application ID that wrote each snapshot:

select
    h.made_current_at,
    s.operation,
    h.snapshot_id,
    h.is_current_ancestor,
    s.summary['flink.job-id']
from prod.db.table$history h
join prod.db.table$snapshots s
  on h.snapshot_id = s.snapshot_id
order by made_current_at;
made_current_at operation snapshot_id is_current_ancestor summary[flink.job-id]
2019-02-08 03:29:51.215 append 57897183625154 true 2e274eecb503d85369fb390e8956c813

Files🔗

To show a table's current data files:

SELECT * FROM prod.db.table$files;
content file_path file_format spec_id partition record_count file_size_in_bytes column_sizes value_counts null_value_counts nan_value_counts lower_bounds upper_bounds key_metadata split_offsets equality_ids sort_order_id
0 s3:/.../table/data/00000-3-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet PARQUET 0 {1999-01-01, 01} 1 597 [1 -> 90, 2 -> 62] [1 -> 1, 2 -> 1] [1 -> 0, 2 -> 0] [] [1 -> , 2 -> c] [1 -> , 2 -> c] null [4] null null
0 s3:/.../table/data/00001-4-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet PARQUET 0 {1999-01-01, 02} 1 597 [1 -> 90, 2 -> 62] [1 -> 1, 2 -> 1] [1 -> 0, 2 -> 0] [] [1 -> , 2 -> b] [1 -> , 2 -> b] null [4] null null
0 s3:/.../table/data/00002-5-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet PARQUET 0 {1999-01-01, 03} 1 597 [1 -> 90, 2 -> 62] [1 -> 1, 2 -> 1] [1 -> 0, 2 -> 0] [] [1 -> , 2 -> a] [1 -> , 2 -> a] null [4] null null

Manifests🔗

To show a table's current file manifests:

SELECT * FROM prod.db.table$manifests;
path length partition_spec_id added_snapshot_id added_data_files_count existing_data_files_count deleted_data_files_count partition_summaries
s3://.../table/metadata/45b5290b-ee61-4788-b324-b1e2735c0e10-m0.avro 4479 0 6668963634911763636 8 0 0 [[false,null,2019-05-13,2019-05-15]]

Note:

  1. Fields within partition_summaries column of the manifests table correspond to field_summary structs within manifest list, with the following order:
    • contains_null
    • contains_nan
    • lower_bound
    • upper_bound
  2. contains_nan could return null, which indicates that this information is not available from the file's metadata. This usually occurs when reading from V1 table, where contains_nan is not populated.

Partitions🔗

To show a table's current partitions:

SELECT * FROM prod.db.table$partitions;
partition spec_id record_count file_count total_data_file_size_in_bytes position_delete_record_count position_delete_file_count equality_delete_record_count equality_delete_file_count last_updated_at(μs) last_updated_snapshot_id
{20211001, 11} 0 1 1 100 2 1 0 0 1633086034192000 9205185327307503337
{20211002, 11} 0 4 3 500 1 1 0 0 1633172537358000 867027598972211003
{20211001, 10} 0 7 4 700 0 0 0 0 1633082598716000 3280122546965981531
{20211002, 10} 0 3 2 400 0 0 1 1 1633169159489000 6941468797545315876

Note: For unpartitioned tables, the partitions table will not contain the partition and spec_id fields.

All Metadata Tables🔗

These tables are unions of the metadata tables specific to the current snapshot, and return metadata across all snapshots.

Danger

The "all" metadata tables may produce more than one row per data file or manifest file because metadata files may be part of more than one table snapshot.

All Data Files🔗

To show all of the table's data files and each file's metadata:

SELECT * FROM prod.db.table$all_data_files;
content file_path file_format partition record_count file_size_in_bytes column_sizes value_counts null_value_counts nan_value_counts lower_bounds upper_bounds key_metadata split_offsets equality_ids sort_order_id
0 s3://.../dt=20210102/00000-0-756e2512-49ae-45bb-aae3-c0ca475e7879-00001.parquet PARQUET {20210102} 14 2444 {1 -> 94, 2 -> 17} {1 -> 14, 2 -> 14} {1 -> 0, 2 -> 0} {} {1 -> 1, 2 -> 20210102} {1 -> 2, 2 -> 20210102} null [4] null 0
0 s3://.../dt=20210103/00000-0-26222098-032f-472b-8ea5-651a55b21210-00001.parquet PARQUET {20210103} 14 2444 {1 -> 94, 2 -> 17} {1 -> 14, 2 -> 14} {1 -> 0, 2 -> 0} {} {1 -> 1, 2 -> 20210103} {1 -> 3, 2 -> 20210103} null [4] null 0
0 s3://.../dt=20210104/00000-0-a3bb1927-88eb-4f1c-bc6e-19076b0d952e-00001.parquet PARQUET {20210104} 14 2444 {1 -> 94, 2 -> 17} {1 -> 14, 2 -> 14} {1 -> 0, 2 -> 0} {} {1 -> 1, 2 -> 20210104} {1 -> 3, 2 -> 20210104} null [4] null 0

All Manifests🔗

To show all of the table's manifest files:

SELECT * FROM prod.db.table$all_manifests;
path length partition_spec_id added_snapshot_id added_data_files_count existing_data_files_count deleted_data_files_count partition_summaries
s3://.../metadata/a85f78c5-3222-4b37-b7e4-faf944425d48-m0.avro 6376 0 6272782676904868561 2 0 0 [{false, false, 20210101, 20210101}]

Note:

  1. Fields within partition_summaries column of the manifests table correspond to field_summary structs within manifest list, with the following order:
    • contains_null
    • contains_nan
    • lower_bound
    • upper_bound
  2. contains_nan could return null, which indicates that this information is not available from the file's metadata. This usually occurs when reading from V1 table, where contains_nan is not populated.

References🔗

To show a table's known snapshot references:

SELECT * FROM prod.db.table$refs;
name type snapshot_id max_reference_age_in_ms min_snapshots_to_keep max_snapshot_age_in_ms
main BRANCH 4686954189838128572 10 20 30
testTag TAG 4686954189838128572 10 null null