Flink TableMaintenance
Flink Table Maintenance BatchMode🔗
Rewrite files action🔗
Iceberg provides API to rewrite small files into large files by submitting Flink batch jobs. The behavior of this Flink action is the same as Spark's rewriteDataFiles.
import org.apache.iceberg.flink.actions.Actions;
TableLoader tableLoader = TableLoader.fromCatalog(
CatalogLoader.hive("my_catalog", configuration, properties),
TableIdentifier.of("database", "table")
);
Table table = tableLoader.loadTable();
RewriteDataFilesActionResult result = Actions.forTable(table)
.rewriteDataFiles()
.execute();
For more details of the rewrite files action, please refer to RewriteDataFilesAction
Flink Table Maintenance StreamingMode🔗
Overview🔗
In Apache Iceberg deployments within Flink streaming environments, implementing automated table maintenance operations—including snapshot expiration
, small file compaction
, and orphan file cleanup
—is critical for optimal query performance and storage efficiency.
Traditionally, these maintenance operations were exclusively accessible through Iceberg Spark Actions, necessitating the deployment and management of dedicated Spark clusters. This dependency on Spark infrastructure solely for table optimization introduces significant architectural complexity and operational overhead.
The TableMaintenance
API in Apache Iceberg empowers Flink jobs to execute maintenance tasks natively, either embedded within existing streaming pipelines or deployed as standalone Flink jobs. This eliminates dependencies on external systems, thereby streamlining architecture, reducing operational costs, and enhancing automation capabilities.
Supported Features (Flink)🔗
ExpireSnapshots🔗
Removes old snapshots and their files. Internally uses cleanExpiredFiles(true)
when committing, so expired metadata/files are cleaned up automatically.
.add(ExpireSnapshots.builder()
.maxSnapshotAge(Duration.ofDays(7))
.retainLast(10)
.deleteBatchSize(1000))
RewriteDataFiles🔗
Compacts small files to optimize file sizes. Supports partial progress commits and limiting maximum rewritten bytes per run.
.add(RewriteDataFiles.builder()
.targetFileSizeBytes(256 * 1024 * 1024)
.minFileSizeBytes(32 * 1024 * 1024)
.partialProgressEnabled(true)
.partialProgressMaxCommits(5))
Lock Management🔗
The TriggerLockFactory
is essential for coordinating maintenance tasks. It prevents concurrent maintenance operations on the same table, which could lead to conflicts or data corruption. This locking mechanism is necessary even for a single job, as multiple instances of the same task could otherwise conflict.
Why Locks Are Needed🔗
- Concurrent Access: Multiple Flink jobs may attempt maintenance simultaneously
- Data Consistency: Ensures only one maintenance operation runs per table at a time
- Resource Management: Prevents resource conflicts and scheduling issues
- Avoid Duplicate Work: Even when only a single compaction job is scheduled, multiple instances could attempt the same operation, leading to redundant work and wasted resources.
Supported Lock Types🔗
JDBC Lock Factory🔗
Uses a database table to manage distributed locks:
Map<String, String> jdbcProps = new HashMap<>();
jdbcProps.put("jdbc.user", "flink");
jdbcProps.put("jdbc.password", "flinkpw");
jdbcProps.put("flink-maintenance.lock.jdbc.init-lock-tables", "true"); // Auto-create lock table if it doesn't exist
TriggerLockFactory lockFactory = new JdbcLockFactory(
"jdbc:postgresql://localhost:5432/iceberg", // JDBC URL
"catalog.db.table", // Lock ID (unique identifier)
jdbcProps // JDBC connection properties
);
ZooKeeper Lock Factory🔗
Uses Apache ZooKeeper for distributed locks:
TriggerLockFactory lockFactory = new ZkLockFactory(
"localhost:2181", // ZooKeeper connection string
"catalog.db.table", // Lock ID (unique identifier)
60000, // sessionTimeoutMs
15000, // connectionTimeoutMs
3000, // baseSleepTimeMs
3 // maxRetries
);
Quick Start🔗
The following example demonstrates the implementation of automated maintenance for an Iceberg table within a Flink environment.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableLoader tableLoader = TableLoader.fromCatalog(
CatalogLoader.hive("my_catalog", configuration, properties),
TableIdentifier.of("database", "table")
);
Map<String, String> jdbcProps = new HashMap<>();
jdbcProps.put("jdbc.user", "flink");
jdbcProps.put("jdbc.password", "flinkpw");
// JdbcLockFactory Example
TriggerLockFactory lockFactory = new JdbcLockFactory(
"jdbc:postgresql://localhost:5432/iceberg", // JDBC URL
"catalog.db.table", // Lock ID (unique identifier)
jdbcProps // JDBC connection properties
);
TableMaintenance.forTable(env, tableLoader, lockFactory)
.uidSuffix("my-maintenance-job")
.rateLimit(Duration.ofMinutes(10))
.lockCheckDelay(Duration.ofSeconds(10))
.add(ExpireSnapshots.builder()
.scheduleOnCommitCount(10)
.maxSnapshotAge(Duration.ofMinutes(10))
.retainLast(5)
.deleteBatchSize(5)
.parallelism(8))
.add(RewriteDataFiles.builder()
.scheduleOnDataFileCount(10)
.targetFileSizeBytes(128 * 1024 * 1024)
.partialProgressEnabled(true)
.partialProgressMaxCommits(10))
.append();
env.execute("Table Maintenance Job");
Configuration Options🔗
TableMaintenance Builder🔗
Method | Description | Default |
---|---|---|
uidSuffix(String) |
Unique identifier suffix for the job | Random UUID |
rateLimit(Duration) |
Minimum interval between task executions | 60 seconds |
lockCheckDelay(Duration) |
Delay for checking lock availability | 30 seconds |
parallelism(int) |
Default parallelism for maintenance tasks | System default |
maxReadBack(int) |
Max snapshots to check during initialization | 100 |
Maintenance Task Common Options🔗
Method | Description | Default Value | Type |
---|---|---|---|
scheduleOnCommitCount(int) |
Trigger after N commits | No automatic scheduling | int |
scheduleOnDataFileCount(int) |
Trigger after N data files | No automatic scheduling | int |
scheduleOnDataFileSize(long) |
Trigger after total data file size (bytes) | No automatic scheduling | long |
scheduleOnPosDeleteFileCount(int) |
Trigger after N positional delete files | No automatic scheduling | int |
scheduleOnPosDeleteRecordCount(long) |
Trigger after N positional delete records | No automatic scheduling | long |
scheduleOnEqDeleteFileCount(int) |
Trigger after N equality delete files | No automatic scheduling | int |
scheduleOnEqDeleteRecordCount(long) |
Trigger after N equality delete records | No automatic scheduling | long |
scheduleOnInterval(Duration) |
Trigger after time interval | No automatic scheduling | Duration |
ExpireSnapshots Configuration🔗
Method | Description | Default Value | Type |
---|---|---|---|
maxSnapshotAge(Duration) |
Maximum age of snapshots to retain | 5 days | Duration |
retainLast(int) |
Minimum number of snapshots to retain | 1 | int |
deleteBatchSize(int) |
Number of files to delete in each batch | 1000 | int |
planningWorkerPoolSize(int) |
Number of worker threads for planning snapshot expiration | Shared worker pool | int |
cleanExpiredMetadata(boolean) |
Remove expired metadata files when expiring snapshots | false | boolean |
RewriteDataFiles Configuration🔗
Method | Description | Default Value | Type |
---|---|---|---|
targetFileSizeBytes(long) |
Target size for rewritten files | Table property or 512MB | long |
minFileSizeBytes(long) |
Minimum size of files eligible for compaction | 75% of target file size | long |
maxFileSizeBytes(long) |
Maximum size of files eligible for compaction | 180% of target file size | long |
minInputFiles(int) |
Minimum number of files to trigger rewrite | 5 | int |
deleteFileThreshold(int) |
Minimum delete-file count per data file to force rewrite | Integer.MAX_VALUE | int |
rewriteAll(boolean) |
Rewrite all data files regardless of thresholds | false | boolean |
maxFileGroupSizeBytes(long) |
Maximum total size of a file group | 107374182400 (100GB) | long |
maxFilesToRewrite(int) |
If this option is not specified, all eligible files will be rewritten | null | int |
partialProgressEnabled(boolean) |
Enable partial progress commits | false | boolean |
partialProgressMaxCommits(int) |
Maximum commits allowed for partial progress when partialProgressEnabled is true | 10 | int |
maxRewriteBytes(long) |
Maximum bytes to rewrite per execution | Long.MAX_VALUE | long |
filter(Expression) |
Filter expression for selecting files to rewrite | Expressions.alwaysTrue() | Expression |
Complete Example🔗
public class TableMaintenanceJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000); // Enable checkpointing
// Configure table loader
TableLoader tableLoader = TableLoader.fromCatalog(
CatalogLoader.hive("my_catalog", configuration),
TableIdentifier.of("database", "table")
);
// Set up JDBC lock factory
Map<String, String> jdbcProps = new HashMap<>();
jdbcProps.put("jdbc.user", "flink");
jdbcProps.put("jdbc.password", "flinkpw");
jdbcProps.put("flink-maintenance.lock.jdbc.init-lock-tables", "true");
TriggerLockFactory lockFactory = new JdbcLockFactory(
"jdbc:postgresql://localhost:5432/iceberg",
"catalog.db.table",
jdbcProps
);
// Set up maintenance with comprehensive configuration
TableMaintenance.forTable(env, tableLoader, lockFactory)
.uidSuffix("production-maintenance")
.rateLimit(Duration.ofMinutes(15))
.lockCheckDelay(Duration.ofSeconds(30))
.parallelism(4)
// Daily snapshot cleanup
.add(ExpireSnapshots.builder()
.maxSnapshotAge(Duration.ofDays(7))
.retainLast(10))
// Continuous file optimization
.add(RewriteDataFiles.builder()
.targetFileSizeBytes(256 * 1024 * 1024)
.minFileSizeBytes(32 * 1024 * 1024)
.scheduleOnDataFileCount(20)
.partialProgressEnabled(true)
.partialProgressMaxCommits(5)
.maxRewriteBytes(2L * 1024 * 1024 * 1024)
.parallelism(6))
.append();
env.execute("Iceberg Table Maintenance");
}
}
IcebergSink with Post-Commit Integration🔗
Apache Iceberg Sink V2 for Flink allows automatic execution of maintenance tasks after data is committed to the table, using the addPostCommitTopology(...) method.
Map<String, String> flinkConf = new HashMap<>();
// Enable compaction and maintenance features
flinkConf.put(FlinkWriteOptions.COMPACTION_ENABLE.key(), "true");
// Configure JDBC lock settings
flinkConf.put(LockConfig.LOCK_TYPE_OPTION.key(), LockConfig.JdbcLockConfig.JDBC);
flinkConf.put(LockConfig.JdbcLockConfig.JDBC_URI_OPTION.key(), "jdbc:postgresql://localhost:5432/iceberg");
flinkConf.put(LockConfig.LOCK_ID_OPTION.key(), "catalog.db.table");
// Add any other maintenance-related options here as needed
// ...
IcebergSink.forRowData(dataStream)
.table(table)
.tableLoader(tableLoader)
.setAll(flinkConf)
.append();
SQL Examples🔗
You can enable maintenance and configure locks using SQL before executing writes:
-- Enable Iceberg V2 Sink and compaction (maintenance)
SET 'table.exec.iceberg.use.v2.sink' = 'true';
SET 'compaction-enabled' = 'true';
-- Configure maintenance lock (JDBC)
SET 'flink-maintenance.lock.type' = 'jdbc';
SET 'flink-maintenance.lock.lock-id' = 'catalog.db.table';
SET 'flink-maintenance.lock.jdbc.uri' = 'jdbc:postgresql://localhost:5432/iceberg';
SET 'flink-maintenance.lock.jdbc.init-lock-tables' = 'true';
-- Now run writes; maintenance will be scheduled post-commit
INSERT INTO db.tbl SELECT ...;
Or specify options in table DDL:
CREATE TABLE db.tbl (
...
) WITH (
'connector' = 'iceberg',
'catalog-name' = 'my_catalog',
'catalog-database' = 'db',
'catalog-table' = 'tbl',
'compaction-enabled' = 'true',
'flink-maintenance.lock.type' = 'jdbc',
'flink-maintenance.lock.lock-id' = 'catalog.db.table',
'flink-maintenance.lock.jdbc.uri' = 'jdbc:postgresql://localhost:5432/iceberg',
'flink-maintenance.lock.jdbc.init-lock-tables' = 'true'
);
Lock Configuration (SQL)🔗
These keys are used in SQL (SET or table WITH options) and are applicable when writing with compaction enabled.
- JDBC
Key | Description | Default |
---|---|---|
flink-maintenance.lock.type |
Set to jdbc |
|
flink-maintenance.lock.lock-id |
Unique lock ID per table | |
flink-maintenance.lock.jdbc.uri |
JDBC URI | |
flink-maintenance.lock.jdbc.init-lock-tables |
Auto-create lock table | false |
- ZooKeeper
Key | Description | Default |
---|---|---|
flink-maintenance.lock.type |
Set to zookeeper |
|
flink-maintenance.lock.lock-id |
Unique lock ID per table | |
flink-maintenance.lock.zookeeper.uri |
ZK connection URI | |
flink-maintenance.lock.zookeeper.session-timeout-ms |
Session timeout (ms) | 60000 |
flink-maintenance.lock.zookeeper.connection-timeout-ms |
Connection timeout (ms) | 15000 |
flink-maintenance.lock.zookeeper.max-retries |
Max retries | 3 |
flink-maintenance.lock.zookeeper.base-sleep-ms |
Base sleep between retries (ms) | 3000 |
Best Practices🔗
Resource Management🔗
- Use dedicated slot sharing groups for maintenance tasks
- Set appropriate parallelism based on cluster resources
- Enable checkpointing for fault tolerance
Scheduling Strategy🔗
- Avoid too frequent executions with
rateLimit
- Use
scheduleOnCommitCount
for write-heavy tables - Use
scheduleOnDataFileCount
for fine-grained control
Performance Tuning🔗
- Adjust
deleteBatchSize
based on storage performance - Enable
partialProgressEnabled
for large rewrite operations - Set reasonable
maxRewriteBytes
limits - Setting an appropriate
maxFileGroupSizeBytes
can break down large FileGroups into smaller ones, thereby increasing the speed of parallel processing
Troubleshooting🔗
OutOfMemoryError during file deletion🔗
Scenario: This can occur when the maintenance task attempts to delete a very large number of files in a single batch, especially in tables with long retention histories or after bulk deletions. Cause: Each file deletion involves metadata and object store operations, which together can consume significant memory. Large batches magnify this effect and may exhaust the JVM heap. Recommendation: Reduce the batch size to limit memory usage during deletion.
Lock conflicts🔗
Scenario: In multi-job or high-availability environments, two or more Flink jobs may attempt maintenance on the same table simultaneously. Cause: Concurrent jobs compete for the same distributed lock, causing retries and possible delays. Recommendation: Increase lock check delay and rate limit so that failed attempts back off and reduce contention.
.lockCheckDelay(Duration.ofMinutes(1)) // Wait longer before re-checking lock
.rateLimit(Duration.ofMinutes(10)) // Reduce frequency of task execution
Slow rewrite operations🔗
Scenario: Large tables with many small files can require rewriting terabytes of data in a single run, which may overwhelm available resources. Cause: Without limits, rewrite tasks attempt to process all eligible files at once, leading to long execution times and possible job failures. Recommendation: Enable partial progress so that rewritten files can be committed in smaller batches, and cap the maximum data rewritten in each execution.