Class FlinkSink.Builder
- java.lang.Object
-
- org.apache.iceberg.flink.sink.FlinkSink.Builder
-
- Enclosing class:
- FlinkSink
public static class FlinkSink.Builder extends java.lang.Object
-
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description org.apache.flink.streaming.api.datastream.DataStreamSink<java.lang.Void>
append()
Append the iceberg sink operators to write records to iceberg table.FlinkSink.Builder
distributionMode(DistributionMode mode)
Configure the writeDistributionMode
that the flink sink will use.FlinkSink.Builder
equalityFieldColumns(java.util.List<java.lang.String> columns)
Configuring the equality field columns for iceberg table that accept CDC or UPSERT events.FlinkSink.Builder
flinkConf(org.apache.flink.configuration.ReadableConfig config)
FlinkSink.Builder
overwrite(boolean newOverwrite)
FlinkSink.Builder
rangeDistributionSortKeyBaseWeight(double weight)
If sort order contains partition columns, each sort key would map to one partition and data file.FlinkSink.Builder
rangeDistributionStatisticsType(StatisticsType type)
Range distribution needs to collect statistics about data distribution to properly shuffle the records in relatively balanced way.FlinkSink.Builder
set(java.lang.String property, java.lang.String value)
Set the write properties for Flink sink.FlinkSink.Builder
setAll(java.util.Map<java.lang.String,java.lang.String> properties)
Set the write properties for Flink sink.FlinkSink.Builder
setSnapshotProperties(java.util.Map<java.lang.String,java.lang.String> properties)
FlinkSink.Builder
setSnapshotProperty(java.lang.String property, java.lang.String value)
FlinkSink.Builder
table(Table newTable)
FlinkSink.Builder
tableLoader(TableLoader newTableLoader)
The table loader is used for loading tables inIcebergFilesCommitter
lazily, we need this loader becauseTable
is not serializable and could not just use the loaded table from Builder#table in the remote task manager.FlinkSink.Builder
tableSchema(org.apache.flink.table.api.TableSchema newTableSchema)
FlinkSink.Builder
toBranch(java.lang.String branch)
FlinkSink.Builder
uidPrefix(java.lang.String newPrefix)
Set the uid prefix for FlinkSink operators.FlinkSink.Builder
upsert(boolean enabled)
All INSERT/UPDATE_AFTER events from input stream will be transformed to UPSERT events, which means it will DELETE the old records and then INSERT the new records.FlinkSink.Builder
writeParallelism(int newWriteParallelism)
Configuring the write parallel number for iceberg stream writer.
-
-
-
Method Detail
-
table
public FlinkSink.Builder table(Table newTable)
This icebergTable
instance is used for initializingIcebergStreamWriter
which will write all the records intoDataFile
s and emit them to downstream operator. Providing a table would avoid so many table loading from each separate task.- Parameters:
newTable
- the loaded iceberg table instance.- Returns:
FlinkSink.Builder
to connect the iceberg table.
-
tableLoader
public FlinkSink.Builder tableLoader(TableLoader newTableLoader)
The table loader is used for loading tables inIcebergFilesCommitter
lazily, we need this loader becauseTable
is not serializable and could not just use the loaded table from Builder#table in the remote task manager.- Parameters:
newTableLoader
- to load iceberg table inside tasks.- Returns:
FlinkSink.Builder
to connect the iceberg table.
-
set
public FlinkSink.Builder set(java.lang.String property, java.lang.String value)
Set the write properties for Flink sink. View the supported properties inFlinkWriteOptions
-
setAll
public FlinkSink.Builder setAll(java.util.Map<java.lang.String,java.lang.String> properties)
Set the write properties for Flink sink. View the supported properties inFlinkWriteOptions
-
tableSchema
public FlinkSink.Builder tableSchema(org.apache.flink.table.api.TableSchema newTableSchema)
-
overwrite
public FlinkSink.Builder overwrite(boolean newOverwrite)
-
flinkConf
public FlinkSink.Builder flinkConf(org.apache.flink.configuration.ReadableConfig config)
-
distributionMode
public FlinkSink.Builder distributionMode(DistributionMode mode)
Configure the writeDistributionMode
that the flink sink will use. Currently, flink supportDistributionMode.NONE
andDistributionMode.HASH
.- Parameters:
mode
- to specify the write distribution mode.- Returns:
FlinkSink.Builder
to connect the iceberg table.
-
rangeDistributionStatisticsType
public FlinkSink.Builder rangeDistributionStatisticsType(StatisticsType type)
Range distribution needs to collect statistics about data distribution to properly shuffle the records in relatively balanced way. In general, low cardinality should useStatisticsType.Map
and high cardinality should useStatisticsType.Sketch
Refer toStatisticsType
Javadoc for more details.Default is
StatisticsType.Auto
where initially Map statistics is used. But if cardinality is higher than the threshold (currently 10K) as defined inSketchUtil#OPERATOR_SKETCH_SWITCH_THRESHOLD
, statistics collection automatically switches to the sketch reservoir sampling.Explicit set the statistics type if the default behavior doesn't work.
- Parameters:
type
- to specify the statistics type for range distribution.- Returns:
FlinkSink.Builder
to connect the iceberg table.
-
rangeDistributionSortKeyBaseWeight
public FlinkSink.Builder rangeDistributionSortKeyBaseWeight(double weight)
If sort order contains partition columns, each sort key would map to one partition and data file. This relative weight can avoid placing too many small files for sort keys with low traffic. It is a double value that defines the minimal weight for each sort key. `0.02` means each key has a base weight of `2%` of the targeted traffic weight per writer task.E.g. the sink Iceberg table is partitioned daily by event time. Assume the data stream contains events from now up to 180 days ago. With event time, traffic weight distribution across different days typically has a long tail pattern. Current day contains the most traffic. The older days (long tail) contain less and less traffic. Assume writer parallelism is `10`. The total weight across all 180 days is `10,000`. Target traffic weight per writer task would be `1,000`. Assume the weight sum for the oldest 150 days is `1,000`. Normally, the range partitioner would put all the oldest 150 days in one writer task. That writer task would write to 150 small files (one per day). Keeping 150 open files can potentially consume large amount of memory. Flushing and uploading 150 files (however small) at checkpoint time can also be potentially slow. If this config is set to `0.02`. It means every sort key has a base weight of `2%` of targeted weight of `1,000` for every write task. It would essentially avoid placing more than `50` data files (one per day) on one writer task no matter how small they are.
This is only applicable to
StatisticsType.Map
for low-cardinality scenario. ForStatisticsType.Sketch
high-cardinality sort columns, they are usually not used as partition columns. Otherwise, too many partitions and small files may be generated during write. Sketch range partitioner simply splits high-cardinality keys into ordered ranges.Default is
0.0%
.
-
writeParallelism
public FlinkSink.Builder writeParallelism(int newWriteParallelism)
Configuring the write parallel number for iceberg stream writer.- Parameters:
newWriteParallelism
- the number of parallel iceberg stream writer.- Returns:
FlinkSink.Builder
to connect the iceberg table.
-
upsert
public FlinkSink.Builder upsert(boolean enabled)
All INSERT/UPDATE_AFTER events from input stream will be transformed to UPSERT events, which means it will DELETE the old records and then INSERT the new records. In partitioned table, the partition fields should be a subset of equality fields, otherwise the old row that located in partition-A could not be deleted by the new row that located in partition-B.- Parameters:
enabled
- indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT.- Returns:
FlinkSink.Builder
to connect the iceberg table.
-
equalityFieldColumns
public FlinkSink.Builder equalityFieldColumns(java.util.List<java.lang.String> columns)
Configuring the equality field columns for iceberg table that accept CDC or UPSERT events.- Parameters:
columns
- defines the iceberg table's key.- Returns:
FlinkSink.Builder
to connect the iceberg table.
-
uidPrefix
public FlinkSink.Builder uidPrefix(java.lang.String newPrefix)
Set the uid prefix for FlinkSink operators. Note that FlinkSink internally consists of multiple operators (like writer, committer, dummy sink etc.) Actually operator uid will be appended with a suffix like "uidPrefix-writer".
If provided, this prefix is also applied to operator names.
Flink auto generates operator uid if not set explicitly. It is a recommended best-practice to set uid for all operators before deploying to production. Flink has an option topipeline.auto-generate-uid=false
to disable auto-generation and force explicit setting of all operator uid.
Be careful with setting this for an existing job, because now we are changing the operator uid from an auto-generated one to this new value. When deploying the change with a checkpoint, Flink won't be able to restore the previous Flink sink operator state (more specifically the committer operator state). You need to use--allowNonRestoredState
to ignore the previous sink state. During restore Flink sink state is used to check if last commit was actually successful or not.--allowNonRestoredState
can lead to data loss if the Iceberg commit failed in the last completed checkpoint.- Parameters:
newPrefix
- prefix for Flink sink operator uid and name- Returns:
FlinkSink.Builder
to connect the iceberg table.
-
setSnapshotProperties
public FlinkSink.Builder setSnapshotProperties(java.util.Map<java.lang.String,java.lang.String> properties)
-
setSnapshotProperty
public FlinkSink.Builder setSnapshotProperty(java.lang.String property, java.lang.String value)
-
toBranch
public FlinkSink.Builder toBranch(java.lang.String branch)
-
append
public org.apache.flink.streaming.api.datastream.DataStreamSink<java.lang.Void> append()
Append the iceberg sink operators to write records to iceberg table.- Returns:
DataStreamSink
for sink.
-
-