Class IcebergSink

java.lang.Object
org.apache.iceberg.flink.sink.IcebergSink
All Implemented Interfaces:
Serializable, org.apache.flink.api.connector.sink2.Sink<org.apache.flink.table.data.RowData>, org.apache.flink.api.connector.sink2.SupportsCommitter<org.apache.iceberg.flink.sink.IcebergCommittable>, org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology<org.apache.iceberg.flink.sink.IcebergCommittable>, org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology<WriteResult,org.apache.iceberg.flink.sink.IcebergCommittable>, org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology<org.apache.flink.table.data.RowData>

@Experimental public class IcebergSink extends Object implements org.apache.flink.api.connector.sink2.Sink<org.apache.flink.table.data.RowData>, org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology<org.apache.flink.table.data.RowData>, org.apache.flink.api.connector.sink2.SupportsCommitter<org.apache.iceberg.flink.sink.IcebergCommittable>, org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology<WriteResult,org.apache.iceberg.flink.sink.IcebergCommittable>, org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology<org.apache.iceberg.flink.sink.IcebergCommittable>
Flink v2 sink offer different hooks to insert custom topologies into the sink. We will use the following:
  • SupportsPreWriteTopology which redistributes the data to the writers based on the DistributionMode
  • SinkWriter which writes data/delete files, and generates the WriteResult objects for the files
  • SupportsPreCommitTopology which we use to place the IcebergWriteAggregator which merges the individual SinkWriter's WriteResults to a single IcebergCommittable
  • IcebergCommitter which commits the incomingIcebergCommittables to the Iceberg table
  • SupportsPostCommitTopology we could use for incremental compaction later. This is not implemented yet.
The job graph looks like below:

                            Flink sink
               +-----------------------------------------------------------------------------------+
               |                                                                                   |
 +-------+     | +----------+                               +-------------+      +---------------+ |
 | Map 1 | ==> | | writer 1 |                               | committer 1 | ---> | post commit 1 | |
 +-------+     | +----------+                               +-------------+      +---------------+ |
               |             \                             /                \                      |
               |              \                           /                  \                     |
               |               \                         /                    \                    |
 +-------+     | +----------+   \ +-------------------+ /   +-------------+    \ +---------------+ |
 | Map 2 | ==> | | writer 2 | --->| commit aggregator |     | committer 2 |      | post commit 2 | |
 +-------+     | +----------+     +-------------------+     +-------------+      +---------------+ |
               |                                             Commit only on                        |
               |                                             committer 1                           |
               +-----------------------------------------------------------------------------------+
 
See Also:
  • Nested Class Summary

    Nested Classes
    Modifier and Type
    Class
    Description
    static class 
     

    Nested classes/interfaces inherited from interface org.apache.flink.api.connector.sink2.Sink

    org.apache.flink.api.connector.sink2.Sink.InitContext, org.apache.flink.api.connector.sink2.Sink.InitContextWrapper
  • Method Summary

    Modifier and Type
    Method
    Description
    void
    addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.iceberg.flink.sink.IcebergCommittable>> committables)
     
    org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.iceberg.flink.sink.IcebergCommittable>>
    addPreCommitTopology(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<WriteResult>> writeResults)
     
    org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.table.data.RowData>
    addPreWriteTopology(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.table.data.RowData> inputDataStream)
     
    builderFor(org.apache.flink.streaming.api.datastream.DataStream<T> input, org.apache.flink.api.common.functions.MapFunction<T,org.apache.flink.table.data.RowData> mapper, org.apache.flink.api.common.typeinfo.TypeInformation<org.apache.flink.table.data.RowData> outputType)
    Initialize a IcebergSink.Builder to export the data from generic input data stream into iceberg table.
    org.apache.flink.api.connector.sink2.Committer<org.apache.iceberg.flink.sink.IcebergCommittable>
    createCommitter(org.apache.flink.api.connector.sink2.CommitterInitContext context)
     
    org.apache.flink.api.connector.sink2.SinkWriter<org.apache.flink.table.data.RowData>
    createWriter(org.apache.flink.api.connector.sink2.Sink.InitContext context)
     
    forRow(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.types.Row> input, org.apache.flink.table.api.TableSchema tableSchema)
    Initialize a IcebergSink.Builder to export the data from input data stream with Rows into iceberg table.
    forRowData(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.table.data.RowData> input)
    Initialize a IcebergSink.Builder to export the data from input data stream with RowDatas into iceberg table.
    org.apache.flink.core.io.SimpleVersionedSerializer<org.apache.iceberg.flink.sink.IcebergCommittable>
     
    org.apache.flink.core.io.SimpleVersionedSerializer<WriteResult>
     

    Methods inherited from class java.lang.Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait

    Methods inherited from interface org.apache.flink.api.connector.sink2.Sink

    createWriter
  • Method Details

    • createWriter

      public org.apache.flink.api.connector.sink2.SinkWriter<org.apache.flink.table.data.RowData> createWriter(org.apache.flink.api.connector.sink2.Sink.InitContext context)
      Specified by:
      createWriter in interface org.apache.flink.api.connector.sink2.Sink<org.apache.flink.table.data.RowData>
    • createCommitter

      public org.apache.flink.api.connector.sink2.Committer<org.apache.iceberg.flink.sink.IcebergCommittable> createCommitter(org.apache.flink.api.connector.sink2.CommitterInitContext context)
      Specified by:
      createCommitter in interface org.apache.flink.api.connector.sink2.SupportsCommitter<org.apache.iceberg.flink.sink.IcebergCommittable>
    • getCommittableSerializer

      public org.apache.flink.core.io.SimpleVersionedSerializer<org.apache.iceberg.flink.sink.IcebergCommittable> getCommittableSerializer()
      Specified by:
      getCommittableSerializer in interface org.apache.flink.api.connector.sink2.SupportsCommitter<org.apache.iceberg.flink.sink.IcebergCommittable>
    • addPostCommitTopology

      public void addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.iceberg.flink.sink.IcebergCommittable>> committables)
      Specified by:
      addPostCommitTopology in interface org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology<org.apache.iceberg.flink.sink.IcebergCommittable>
    • addPreWriteTopology

      public org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.table.data.RowData> addPreWriteTopology(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.table.data.RowData> inputDataStream)
      Specified by:
      addPreWriteTopology in interface org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology<org.apache.flink.table.data.RowData>
    • addPreCommitTopology

      public org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.iceberg.flink.sink.IcebergCommittable>> addPreCommitTopology(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<WriteResult>> writeResults)
      Specified by:
      addPreCommitTopology in interface org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology<WriteResult,org.apache.iceberg.flink.sink.IcebergCommittable>
    • getWriteResultSerializer

      public org.apache.flink.core.io.SimpleVersionedSerializer<WriteResult> getWriteResultSerializer()
      Specified by:
      getWriteResultSerializer in interface org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology<WriteResult,org.apache.iceberg.flink.sink.IcebergCommittable>
    • builderFor

      public static <T> IcebergSink.Builder builderFor(org.apache.flink.streaming.api.datastream.DataStream<T> input, org.apache.flink.api.common.functions.MapFunction<T,org.apache.flink.table.data.RowData> mapper, org.apache.flink.api.common.typeinfo.TypeInformation<org.apache.flink.table.data.RowData> outputType)
      Initialize a IcebergSink.Builder to export the data from generic input data stream into iceberg table. We use RowData inside the sink connector, so users need to provide a mapper function and a TypeInformation to convert those generic records to a RowData DataStream.
      Type Parameters:
      T - the data type of records.
      Parameters:
      input - the generic source input data stream.
      mapper - function to convert the generic data to RowData
      outputType - to define the TypeInformation for the input data.
      Returns:
      IcebergSink.Builder to connect the iceberg table.
    • forRow

      public static IcebergSink.Builder forRow(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.types.Row> input, org.apache.flink.table.api.TableSchema tableSchema)
      Initialize a IcebergSink.Builder to export the data from input data stream with Rows into iceberg table. We use RowData inside the sink connector, so users need to provide a TableSchema for builder to convert those Rows to a RowData DataStream.
      Parameters:
      input - the source input data stream with Rows.
      tableSchema - defines the TypeInformation for input data.
      Returns:
      IcebergSink.Builder to connect the iceberg table.
    • forRowData

      public static IcebergSink.Builder forRowData(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.table.data.RowData> input)
      Initialize a IcebergSink.Builder to export the data from input data stream with RowDatas into iceberg table.
      Parameters:
      input - the source input data stream with RowDatas.
      Returns:
      IcebergSink.Builder to connect the iceberg table.