Flink DDL
DDL commands🔗
CREATE Catalog🔗
Hive catalog🔗
This creates an Iceberg catalog named hive_catalog that can be configured using 'catalog-type'='hive', which loads tables from Hive metastore:
CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://localhost:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://nn:8020/warehouse/path'
);
The following properties can be set if using the Hive catalog:
- uri: The Hive metastore's thrift URI. (Required)
- clients: The Hive metastore client pool size, default value is 2. (Optional)
- warehouse: The Hive warehouse location, users should specify this path if neither set the- hive-conf-dirto specify a location containing a- hive-site.xmlconfiguration file nor add a correct- hive-site.xmlto classpath.
- hive-conf-dir: Path to a directory containing a- hive-site.xmlconfiguration file which will be used to provide custom Hive configuration values. The value of- hive.metastore.warehouse.dirfrom- <hive-conf-dir>/hive-site.xml(or hive configure file from classpath) will be overwritten with the- warehousevalue if setting both- hive-conf-dirand- warehousewhen creating iceberg catalog.
- hadoop-conf-dir: Path to a directory containing- core-site.xmland- hdfs-site.xmlconfiguration files which will be used to provide custom Hadoop configuration values.
Hadoop catalog🔗
Iceberg also supports a directory-based catalog in HDFS that can be configured using 'catalog-type'='hadoop':
CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='hdfs://nn:8020/warehouse/path',
  'property-version'='1'
);
The following properties can be set if using the Hadoop catalog:
- warehouse: The HDFS directory to store metadata files and data files. (Required)
Execute the sql command USE CATALOG hadoop_catalog to set the current catalog.
REST catalog🔗
This creates an iceberg catalog named rest_catalog that can be configured using 'catalog-type'='rest', which loads tables from a REST catalog:
CREATE CATALOG rest_catalog WITH (
  'type'='iceberg',
  'catalog-type'='rest',
  'uri'='https://localhost/'
);
The following properties can be set if using the REST catalog:
- uri: The URL to the REST Catalog (Required)
- credential: A credential to exchange for a token in the OAuth2 client credentials flow (Optional)
- token: A token which will be used to interact with the server (Optional)
Custom catalog🔗
Flink also supports loading a custom Iceberg Catalog implementation by specifying the catalog-impl property:
CREATE CATALOG my_catalog WITH (
  'type'='iceberg',
  'catalog-impl'='com.my.custom.CatalogImpl',
  'my-additional-catalog-config'='my-value'
);
Create through YAML config🔗
Catalogs can be registered in sql-client-defaults.yaml before starting the SQL client.
catalogs: 
  - name: my_catalog
    type: iceberg
    catalog-type: hadoop
    warehouse: hdfs://nn:8020/warehouse/path
Create through SQL Files🔗
The Flink SQL Client supports the -i startup option to execute an initialization SQL file to set up environment when starting up the SQL Client.
-- define available catalogs
CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://localhost:9083',
  'warehouse'='hdfs://nn:8020/warehouse/path'
);
USE CATALOG hive_catalog;
Using -i <init.sql> option to initialize SQL Client session:
CREATE DATABASE🔗
By default, Iceberg will use the default database in Flink. Using the following example to create a separate database in order to avoid creating tables under the default database:
CREATE TABLE🔗
CREATE TABLE `hive_catalog`.`default`.`sample` (
    id BIGINT COMMENT 'unique id',
    data STRING NOT NULL
) WITH ('format-version'='2');
Table create commands support the commonly used Flink create clauses including:
- PARTITION BY (column1, column2, ...)to configure partitioning, Flink does not yet support hidden partitioning.
- COMMENT 'table document'to set a table description.
- WITH ('key'='value', ...)to set table configuration which will be stored in Iceberg table properties.
Currently, it does not support computed column and watermark definition etc.
PRIMARY KEY🔗
Primary key constraint can be declared for a column or a set of columns, which must be unique and do not contain null.
It's required for UPSERT mode.
CREATE TABLE `hive_catalog`.`default`.`sample` (
    id BIGINT COMMENT 'unique id',
    data STRING NOT NULL,
    PRIMARY KEY(`id`) NOT ENFORCED
) WITH ('format-version'='2');
PARTITIONED BY🔗
To create a partition table, use PARTITIONED BY:
CREATE TABLE `hive_catalog`.`default`.`sample` (
    id BIGINT COMMENT 'unique id',
    data STRING NOT NULL
) 
PARTITIONED BY (data) 
WITH ('format-version'='2');
Iceberg supports hidden partitioning but Flink doesn't support partitioning by a function on columns. There is no way to support hidden partitions in the Flink DDL.
CREATE TABLE LIKE🔗
To create a table with the same schema, partitioning, and table properties as another table, use CREATE TABLE LIKE.
CREATE TABLE `hive_catalog`.`default`.`sample` (
    id BIGINT COMMENT 'unique id',
    data STRING
);
CREATE TABLE  `hive_catalog`.`default`.`sample_like` LIKE `hive_catalog`.`default`.`sample`;
For more details, refer to the Flink CREATE TABLE documentation.
ALTER TABLE🔗
Iceberg only support altering table properties:
ALTER TABLE .. RENAME TO🔗
DROP TABLE🔗
To delete a table, run: