Apache Flink supports creating Iceberg table directly without creating the explicit Flink catalog in Flink SQL. That means we can just create an iceberg table by specifying 'connector'='iceberg' table option in Flink SQL which is similar to usage in the Flink official document.

In Flink, the SQL CREATE TABLE test (..) WITH ('connector'='iceberg', ...) will create a Flink table in current Flink catalog (use GenericInMemoryCatalog by default), which is just mapping to the underlying iceberg table instead of maintaining iceberg table directly in current Flink catalog.

To create the table in Flink SQL by using SQL syntax CREATE TABLE test (..) WITH ('connector'='iceberg', ...), Flink iceberg connector provides the following table properties:

Table managed in Hive catalog.

Before executing the following SQL, please make sure you’ve configured the Flink SQL client correctly according to the quick start document.

The following SQL will create a Flink table in the current Flink catalog, which maps to the iceberg table default_database.iceberg_table managed in iceberg catalog.

CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='hive_prod',
    'uri'='thrift://localhost:9083',
    'warehouse'='hdfs://nn:8020/path/to/warehouse'
);

If you want to create a Flink table mapping to a different iceberg table managed in Hive catalog (such as hive_db.hive_iceberg_table in Hive), then you can create Flink table as following:

CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='hive_prod',
    'catalog-database'='hive_db',
    'catalog-table'='hive_iceberg_table',
    'uri'='thrift://localhost:9083',
    'warehouse'='hdfs://nn:8020/path/to/warehouse'
);

Note

The underlying catalog database (hive_db in the above example) will be created automatically if it does not exist when writing records into the Flink table.

Table managed in hadoop catalog

The following SQL will create a Flink table in current Flink catalog, which maps to the iceberg table default_database.flink_table managed in hadoop catalog.

CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='hadoop_prod',
    'catalog-type'='hadoop',
    'warehouse'='hdfs://nn:8020/path/to/warehouse'
);

Table managed in custom catalog

The following SQL will create a Flink table in current Flink catalog, which maps to the iceberg table default_database.flink_table managed in custom catalog.

CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='custom_prod',
    'catalog-type'='custom',
    'catalog-impl'='com.my.custom.CatalogImpl',
     -- More table properties for the customized catalog
    'my-additional-catalog-config'='my-value',
     ...
);

Please check sections under the Integrations tab for all custom catalogs.

A complete example.

Take the Hive catalog as an example:

CREATE TABLE flink_table (
    id   BIGINT,
    data STRING
) WITH (
    'connector'='iceberg',
    'catalog-name'='hive_prod',
    'uri'='thrift://localhost:9083',
    'warehouse'='file:///path/to/warehouse'
);

INSERT INTO flink_table VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC');

SET execution.result-mode=tableau;
SELECT * FROM flink_table;

+----+------+
| id | data |
+----+------+
|  1 |  AAA |
|  2 |  BBB |
|  3 |  CCC |
+----+------+
3 rows in set

For more details, please refer to the Iceberg Flink document.