Package org.apache.iceberg.flink.sink
Class IcebergSink
- java.lang.Object
-
- org.apache.iceberg.flink.sink.IcebergSink
-
- All Implemented Interfaces:
java.io.Serializable
,org.apache.flink.api.connector.sink2.Sink<org.apache.flink.table.data.RowData>
,org.apache.flink.api.connector.sink2.SupportsCommitter<org.apache.iceberg.flink.sink.IcebergCommittable>
,org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology<org.apache.iceberg.flink.sink.IcebergCommittable>
,org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology<WriteResult,org.apache.iceberg.flink.sink.IcebergCommittable>
,org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology<org.apache.flink.table.data.RowData>
@Experimental public class IcebergSink extends java.lang.Object implements org.apache.flink.api.connector.sink2.Sink<org.apache.flink.table.data.RowData>, org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology<org.apache.flink.table.data.RowData>, org.apache.flink.api.connector.sink2.SupportsCommitter<org.apache.iceberg.flink.sink.IcebergCommittable>, org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology<WriteResult,org.apache.iceberg.flink.sink.IcebergCommittable>, org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology<org.apache.iceberg.flink.sink.IcebergCommittable>
Flink v2 sink offer different hooks to insert custom topologies into the sink. We will use the following:SupportsPreWriteTopology
which redistributes the data to the writers based on theDistributionMode
SinkWriter
which writes data/delete files, and generates theWriteResult
objects for the filesSupportsPreCommitTopology
which we use to place theIcebergWriteAggregator
which merges the individualSinkWriter
'sWriteResult
s to a singleIcebergCommittable
IcebergCommitter
which commits the incomingIcebergCommittable
s to the Iceberg tableSupportsPostCommitTopology
we could use for incremental compaction later. This is not implemented yet.
Flink sink +-----------------------------------------------------------------------------------+ | | +-------+ | +----------+ +-------------+ +---------------+ | | Map 1 | ==> | | writer 1 | | committer 1 | ---> | post commit 1 | | +-------+ | +----------+ +-------------+ +---------------+ | | \ / \ | | \ / \ | | \ / \ | +-------+ | +----------+ \ +-------------------+ / +-------------+ \ +---------------+ | | Map 2 | ==> | | writer 2 | --->| commit aggregator | | committer 2 | | post commit 2 | | +-------+ | +----------+ +-------------------+ +-------------+ +---------------+ | | Commit only on | | committer 1 | +-----------------------------------------------------------------------------------+
- See Also:
- Serialized Form
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
IcebergSink.Builder
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description void
addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.iceberg.flink.sink.IcebergCommittable>> committables)
org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.iceberg.flink.sink.IcebergCommittable>>
addPreCommitTopology(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<WriteResult>> writeResults)
org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.table.data.RowData>
addPreWriteTopology(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.table.data.RowData> inputDataStream)
static <T> IcebergSink.Builder
builderFor(org.apache.flink.streaming.api.datastream.DataStream<T> input, org.apache.flink.api.common.functions.MapFunction<T,org.apache.flink.table.data.RowData> mapper, org.apache.flink.api.common.typeinfo.TypeInformation<org.apache.flink.table.data.RowData> outputType)
Initialize aIcebergSink.Builder
to export the data from generic input data stream into iceberg table.org.apache.flink.api.connector.sink2.Committer<org.apache.iceberg.flink.sink.IcebergCommittable>
createCommitter(org.apache.flink.api.connector.sink2.CommitterInitContext context)
org.apache.flink.api.connector.sink2.SinkWriter<org.apache.flink.table.data.RowData>
createWriter(org.apache.flink.api.connector.sink2.Sink.InitContext context)
static IcebergSink.Builder
forRow(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.types.Row> input, org.apache.flink.table.api.TableSchema tableSchema)
Initialize aIcebergSink.Builder
to export the data from input data stream withRow
s into iceberg table.static IcebergSink.Builder
forRowData(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.table.data.RowData> input)
Initialize aIcebergSink.Builder
to export the data from input data stream withRowData
s into iceberg table.org.apache.flink.core.io.SimpleVersionedSerializer<org.apache.iceberg.flink.sink.IcebergCommittable>
getCommittableSerializer()
org.apache.flink.core.io.SimpleVersionedSerializer<WriteResult>
getWriteResultSerializer()
-
-
-
Method Detail
-
createWriter
public org.apache.flink.api.connector.sink2.SinkWriter<org.apache.flink.table.data.RowData> createWriter(org.apache.flink.api.connector.sink2.Sink.InitContext context)
- Specified by:
createWriter
in interfaceorg.apache.flink.api.connector.sink2.Sink<org.apache.flink.table.data.RowData>
-
createCommitter
public org.apache.flink.api.connector.sink2.Committer<org.apache.iceberg.flink.sink.IcebergCommittable> createCommitter(org.apache.flink.api.connector.sink2.CommitterInitContext context)
- Specified by:
createCommitter
in interfaceorg.apache.flink.api.connector.sink2.SupportsCommitter<org.apache.iceberg.flink.sink.IcebergCommittable>
-
getCommittableSerializer
public org.apache.flink.core.io.SimpleVersionedSerializer<org.apache.iceberg.flink.sink.IcebergCommittable> getCommittableSerializer()
- Specified by:
getCommittableSerializer
in interfaceorg.apache.flink.api.connector.sink2.SupportsCommitter<org.apache.iceberg.flink.sink.IcebergCommittable>
-
addPostCommitTopology
public void addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.iceberg.flink.sink.IcebergCommittable>> committables)
- Specified by:
addPostCommitTopology
in interfaceorg.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology<org.apache.iceberg.flink.sink.IcebergCommittable>
-
addPreWriteTopology
public org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.table.data.RowData> addPreWriteTopology(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.table.data.RowData> inputDataStream)
- Specified by:
addPreWriteTopology
in interfaceorg.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology<org.apache.flink.table.data.RowData>
-
addPreCommitTopology
public org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.iceberg.flink.sink.IcebergCommittable>> addPreCommitTopology(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<WriteResult>> writeResults)
- Specified by:
addPreCommitTopology
in interfaceorg.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology<WriteResult,org.apache.iceberg.flink.sink.IcebergCommittable>
-
getWriteResultSerializer
public org.apache.flink.core.io.SimpleVersionedSerializer<WriteResult> getWriteResultSerializer()
- Specified by:
getWriteResultSerializer
in interfaceorg.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology<WriteResult,org.apache.iceberg.flink.sink.IcebergCommittable>
-
builderFor
public static <T> IcebergSink.Builder builderFor(org.apache.flink.streaming.api.datastream.DataStream<T> input, org.apache.flink.api.common.functions.MapFunction<T,org.apache.flink.table.data.RowData> mapper, org.apache.flink.api.common.typeinfo.TypeInformation<org.apache.flink.table.data.RowData> outputType)
Initialize aIcebergSink.Builder
to export the data from generic input data stream into iceberg table. We useRowData
inside the sink connector, so users need to provide a mapper function and aTypeInformation
to convert those generic records to a RowData DataStream.- Type Parameters:
T
- the data type of records.- Parameters:
input
- the generic source input data stream.mapper
- function to convert the generic data toRowData
outputType
- to define theTypeInformation
for the input data.- Returns:
IcebergSink.Builder
to connect the iceberg table.
-
forRow
public static IcebergSink.Builder forRow(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.types.Row> input, org.apache.flink.table.api.TableSchema tableSchema)
Initialize aIcebergSink.Builder
to export the data from input data stream withRow
s into iceberg table. We useRowData
inside the sink connector, so users need to provide aTableSchema
for builder to convert thoseRow
s to aRowData
DataStream.- Parameters:
input
- the source input data stream withRow
s.tableSchema
- defines theTypeInformation
for input data.- Returns:
IcebergSink.Builder
to connect the iceberg table.
-
forRowData
public static IcebergSink.Builder forRowData(org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.table.data.RowData> input)
Initialize aIcebergSink.Builder
to export the data from input data stream withRowData
s into iceberg table.- Parameters:
input
- the source input data stream withRowData
s.- Returns:
IcebergSink.Builder
to connect the iceberg table.
-
-