Class IcebergSink

  • All Implemented Interfaces:
    java.io.Serializable, org.apache.flink.api.connector.sink2.Sink<org.apache.flink.table.data.RowData>, org.apache.flink.api.connector.sink2.SupportsCommitter<org.apache.iceberg.flink.sink.IcebergCommittable>, org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology<org.apache.iceberg.flink.sink.IcebergCommittable>, org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology<WriteResult,​org.apache.iceberg.flink.sink.IcebergCommittable>, org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology<org.apache.flink.table.data.RowData>

    @Experimental
    public class IcebergSink
    extends java.lang.Object
    implements org.apache.flink.api.connector.sink2.Sink<org.apache.flink.table.data.RowData>, org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology<org.apache.flink.table.data.RowData>, org.apache.flink.api.connector.sink2.SupportsCommitter<org.apache.iceberg.flink.sink.IcebergCommittable>, org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology<WriteResult,​org.apache.iceberg.flink.sink.IcebergCommittable>, org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology<org.apache.iceberg.flink.sink.IcebergCommittable>
    Flink v2 sink offer different hooks to insert custom topologies into the sink. We will use the following:
    • SupportsPreWriteTopology which redistributes the data to the writers based on the DistributionMode
    • SinkWriter which writes data/delete files, and generates the WriteResult objects for the files
    • SupportsPreCommitTopology which we use to place the IcebergWriteAggregator which merges the individual SinkWriter's WriteResults to a single IcebergCommittable
    • IcebergCommitter which commits the incomingIcebergCommittables to the Iceberg table
    • SupportsPostCommitTopology we could use for incremental compaction later. This is not implemented yet.
    The job graph looks like below:
    
                                Flink sink
                   +-----------------------------------------------------------------------------------+
                   |                                                                                   |
     +-------+     | +----------+                               +-------------+      +---------------+ |
     | Map 1 | ==> | | writer 1 |                               | committer 1 | ---> | post commit 1 | |
     +-------+     | +----------+                               +-------------+      +---------------+ |
                   |             \                             /                \                      |
                   |              \                           /                  \                     |
                   |               \                         /                    \                    |
     +-------+     | +----------+   \ +-------------------+ /   +-------------+    \ +---------------+ |
     | Map 2 | ==> | | writer 2 | --->| commit aggregator |     | committer 2 |      | post commit 2 | |
     +-------+     | +----------+     +-------------------+     +-------------+      +---------------+ |
                   |                                             Commit only on                        |
                   |                                             committer 1                           |
                   +-----------------------------------------------------------------------------------+
     
    See Also:
    Serialized Form
    • Nested Class Summary

      Nested Classes 
      Modifier and Type Class Description
      static class  IcebergSink.Builder  
      • Nested classes/interfaces inherited from interface org.apache.flink.api.connector.sink2.Sink

        org.apache.flink.api.connector.sink2.Sink.InitContext, org.apache.flink.api.connector.sink2.Sink.InitContextWrapper
    • Method Summary

      All Methods Static Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      void addPostCommitTopology​(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.iceberg.flink.sink.IcebergCommittable>> committables)  
      org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.iceberg.flink.sink.IcebergCommittable>> addPreCommitTopology​(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<WriteResult>> writeResults)  
      org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.table.data.RowData> addPreWriteTopology​(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.table.data.RowData> inputDataStream)  
      static <T> IcebergSink.Builder builderFor​(org.apache.flink.streaming.api.datastream.DataStream<T> input, org.apache.flink.api.common.functions.MapFunction<T,​org.apache.flink.table.data.RowData> mapper, org.apache.flink.api.common.typeinfo.TypeInformation<org.apache.flink.table.data.RowData> outputType)
      Initialize a IcebergSink.Builder to export the data from generic input data stream into iceberg table.
      org.apache.flink.api.connector.sink2.Committer<org.apache.iceberg.flink.sink.IcebergCommittable> createCommitter​(org.apache.flink.api.connector.sink2.CommitterInitContext context)  
      org.apache.flink.api.connector.sink2.SinkWriter<org.apache.flink.table.data.RowData> createWriter​(org.apache.flink.api.connector.sink2.Sink.InitContext context)  
      static IcebergSink.Builder forRow​(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.types.Row> input, org.apache.flink.table.api.TableSchema tableSchema)
      Initialize a IcebergSink.Builder to export the data from input data stream with Rows into iceberg table.
      static IcebergSink.Builder forRowData​(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.table.data.RowData> input)
      Initialize a IcebergSink.Builder to export the data from input data stream with RowDatas into iceberg table.
      org.apache.flink.core.io.SimpleVersionedSerializer<org.apache.iceberg.flink.sink.IcebergCommittable> getCommittableSerializer()  
      org.apache.flink.core.io.SimpleVersionedSerializer<WriteResult> getWriteResultSerializer()  
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
      • Methods inherited from interface org.apache.flink.api.connector.sink2.Sink

        createWriter
    • Method Detail

      • createWriter

        public org.apache.flink.api.connector.sink2.SinkWriter<org.apache.flink.table.data.RowData> createWriter​(org.apache.flink.api.connector.sink2.Sink.InitContext context)
        Specified by:
        createWriter in interface org.apache.flink.api.connector.sink2.Sink<org.apache.flink.table.data.RowData>
      • createCommitter

        public org.apache.flink.api.connector.sink2.Committer<org.apache.iceberg.flink.sink.IcebergCommittable> createCommitter​(org.apache.flink.api.connector.sink2.CommitterInitContext context)
        Specified by:
        createCommitter in interface org.apache.flink.api.connector.sink2.SupportsCommitter<org.apache.iceberg.flink.sink.IcebergCommittable>
      • getCommittableSerializer

        public org.apache.flink.core.io.SimpleVersionedSerializer<org.apache.iceberg.flink.sink.IcebergCommittable> getCommittableSerializer()
        Specified by:
        getCommittableSerializer in interface org.apache.flink.api.connector.sink2.SupportsCommitter<org.apache.iceberg.flink.sink.IcebergCommittable>
      • addPostCommitTopology

        public void addPostCommitTopology​(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.iceberg.flink.sink.IcebergCommittable>> committables)
        Specified by:
        addPostCommitTopology in interface org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology<org.apache.iceberg.flink.sink.IcebergCommittable>
      • addPreWriteTopology

        public org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.table.data.RowData> addPreWriteTopology​(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.table.data.RowData> inputDataStream)
        Specified by:
        addPreWriteTopology in interface org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology<org.apache.flink.table.data.RowData>
      • addPreCommitTopology

        public org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.iceberg.flink.sink.IcebergCommittable>> addPreCommitTopology​(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<WriteResult>> writeResults)
        Specified by:
        addPreCommitTopology in interface org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology<WriteResult,​org.apache.iceberg.flink.sink.IcebergCommittable>
      • getWriteResultSerializer

        public org.apache.flink.core.io.SimpleVersionedSerializer<WriteResult> getWriteResultSerializer()
        Specified by:
        getWriteResultSerializer in interface org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology<WriteResult,​org.apache.iceberg.flink.sink.IcebergCommittable>
      • builderFor

        public static <T> IcebergSink.Builder builderFor​(org.apache.flink.streaming.api.datastream.DataStream<T> input,
                                                         org.apache.flink.api.common.functions.MapFunction<T,​org.apache.flink.table.data.RowData> mapper,
                                                         org.apache.flink.api.common.typeinfo.TypeInformation<org.apache.flink.table.data.RowData> outputType)
        Initialize a IcebergSink.Builder to export the data from generic input data stream into iceberg table. We use RowData inside the sink connector, so users need to provide a mapper function and a TypeInformation to convert those generic records to a RowData DataStream.
        Type Parameters:
        T - the data type of records.
        Parameters:
        input - the generic source input data stream.
        mapper - function to convert the generic data to RowData
        outputType - to define the TypeInformation for the input data.
        Returns:
        IcebergSink.Builder to connect the iceberg table.
      • forRow

        public static IcebergSink.Builder forRow​(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.types.Row> input,
                                                 org.apache.flink.table.api.TableSchema tableSchema)
        Initialize a IcebergSink.Builder to export the data from input data stream with Rows into iceberg table. We use RowData inside the sink connector, so users need to provide a TableSchema for builder to convert those Rows to a RowData DataStream.
        Parameters:
        input - the source input data stream with Rows.
        tableSchema - defines the TypeInformation for input data.
        Returns:
        IcebergSink.Builder to connect the iceberg table.
      • forRowData

        public static IcebergSink.Builder forRowData​(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.table.data.RowData> input)
        Initialize a IcebergSink.Builder to export the data from input data stream with RowDatas into iceberg table.
        Parameters:
        input - the source input data stream with RowDatas.
        Returns:
        IcebergSink.Builder to connect the iceberg table.