Spark¶
Iceberg uses Apache Spark’s DataSourceV2 API for data source and catalog implementations. Spark DSv2 is an evolving API with different levels of support in Spark versions.
Feature support | Spark 3.0 | Spark 2.4 | Notes |
---|---|---|---|
SQL create table | ✔️ | ||
SQL create table as | ✔️ | ||
SQL replace table as | ✔️ | ||
SQL alter table | ✔️ | ||
SQL drop table | ✔️ | ||
SQL select | ✔️ | ||
SQL insert into | ✔️ | ||
SQL insert overwrite | ✔️ | ||
DataFrame reads | ✔️ | ✔️ | |
DataFrame append | ✔️ | ✔️ | |
DataFrame overwrite | ✔️ | ✔️ | ⚠ Behavior changed in Spark 3.0 |
DataFrame CTAS and RTAS | ✔️ | ||
Metadata tables | ✔️ | ✔️ |
Configuring catalogs¶
Spark 3.0 adds an API to plug in table catalogs that are used to load, create, and manage Iceberg tables. Spark catalogs are configured by setting Spark properties under spark.sql.catalog
.
This creates an Iceberg catalog named hive_prod
that loads tables from a Hive metastore:
spark.sql.catalog.hive_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type = hive
spark.sql.catalog.hive_prod.uri = thrift://metastore-host:port
# omit uri to use the same URI as Spark: hive.metastore.uris in hive-site.xml
Iceberg also supports a directory-based catalog in HDFS that can be configured using type=hadoop
:
spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type = hadoop
spark.sql.catalog.hadoop_prod.warehouse = hdfs://nn:8020/warehouse/path
Note
The Hive-based catalog only loads Iceberg tables. To load non-Iceberg tables in the same Hive metastore, use a session catalog.
Using catalogs¶
Catalog names are used in SQL queries to identify a table. In the examples above, hive_prod
and hadoop_prod
can be used to prefix database and table names that will be loaded from those catalogs.
SELECT * FROM hive_prod.db.table -- load db.table from catalog hive_prod
Spark 3 keeps track of the current catalog and namespace, which can be omitted from table names.
USE hive_prod.db;
SELECT * FROM table -- load db.table from catalog hive_prod
To see the current catalog and namespace, run SHOW CURRENT NAMESPACE
.
Replacing the session catalog¶
To add Iceberg table support to Spark’s built-in catalog, configure spark_catalog
to use Iceberg’s SparkSessionCatalog
.
spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type = hive
Spark’s built-in catalog supports existing v1 and v2 tables tracked in a Hive Metastore. This configures Spark to use Iceberg’s SparkSessionCatalog
as a wrapper around that session catalog. When a table is not an Iceberg table, the built-in catalog will be used to load it instead.
This configuration can use same Hive Metastore for both Iceberg and non-Iceberg tables.
Loading a custom catalog¶
Spark supports loading a custom Iceberg Catalog
implementation by specifying the catalog-impl
property.
When catalog-impl
is set, the value of type
is ignored. Here is an example:
spark.sql.catalog.custom_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.custom_prod.catalog-impl = com.my.custom.CatalogImpl
spark.sql.catalog.custom_prod.my-additional-catalog-config = my-value
DDL commands¶
Note
Spark 2.4 can’t create Iceberg tables with DDL, instead use the Iceberg API.
CREATE TABLE
¶
Spark 3.0 can create tables in any Iceberg catalog with the clause USING iceberg
:
CREATE TABLE prod.db.sample (
id bigint COMMENT 'unique id',
data string)
USING iceberg
Iceberg will convert the column type in Spark to corresponding Iceberg type. Please check the section of type compatibility on creating table for details.
Table create commands, including CTAS and RTAS, support the full range of Spark create clauses, including:
PARTITION BY (partition-expressions)
to configure partitioningLOCATION '(fully-qualified-uri)'
to set the table locationCOMMENT 'table documentation'
to set a table descriptionTBLPROPERTIES ('key'='value', ...)
to set table configuration
Create commands may also set the default format with the USING
clause. This is only supported for SparkCatalog
because Spark handles the USING
clause differently for the built-in catalog.
PARTITIONED BY
¶
To create a partitioned table, use PARTITIONED BY
:
CREATE TABLE prod.db.sample (
id bigint,
data string,
category string)
USING iceberg
PARTITIONED BY (category)
The PARTITIONED BY
clause supports transform expressions to create hidden partitions.
CREATE TABLE prod.db.sample (
id bigint,
data string,
category string,
ts timestamp)
USING iceberg
PARTITIONED BY (bucket(16, id), days(ts), category)
Supported partition transforms are:
years
for yearly partitionsmonths
for monthly partitionsdays
for daily partitionshours
for hourly partitionsbucket
for bucketing (with width)truncate
to truncate integers or strings (with length)
CREATE TABLE ... AS SELECT
¶
Iceberg supports CTAS as an atomic operation when using a SparkCatalog
. CTAS is supported, but is not atomic when using SparkSessionCatalog
.
CREATE TABLE prod.db.sample
USING iceberg
AS SELECT ...
REPLACE TABLE ... AS SELECT
¶
Iceberg supports RTAS as an atomic operation when using a SparkCatalog
. RTAS is supported, but is not atomic when using SparkSessionCatalog
.
Atomic table replacement creates a new snapshot with the results of the SELECT
query, but keeps table history.
REPLACE TABLE prod.db.sample
USING iceberg
AS SELECT ...
CREATE OR REPLACE TABLE prod.db.sample
USING iceberg
AS SELECT ...
The schema and partition spec will be replaced if changed. To avoid modifying the table’s schema and partitioning, use INSERT OVERWRITE
instead of REPLACE TABLE
.
The new table properties in the REPLACE TABLE
command will be merged with any existing table properties. The existing table properties will be updated if changed else they are preserved.
ALTER TABLE
¶
Iceberg has full ALTER TABLE
support in Spark 3, including:
- Renaming a table
- Setting or removing table properties
- Adding, deleting, and renaming columns
- Adding, deleting, and renaming nested fields
- Reordering top-level columns and nested struct fields
- Widening the type of
int
,float
, anddecimal
fields - Making required columns optional
ALTER TABLE ... RENAME TO
¶
ALTER TABLE prod.db.sample RENAME TO prod.db.new_name
ALTER TABLE ... SET TBLPROPERTIES
¶
ALTER TABLE prod.db.sample SET TBLPROPERTIES (
'read.split.target-size'='268435456'
)
Iceberg uses table properties to control table behavior. For a list of available properties, see Table configuration.
UNSET
is used to remove properties:
ALTER TABLE prod.db.sample UNSET TBLPROPERTIES ('read.split.target-size')
ALTER TABLE ... ADD COLUMN
¶
ALTER TABLE prod.db.sample ADD COLUMN point struct<x: double NOT NULL, y: double NOT NULL> AFTER data
ALTER TABLE prod.db.sample ADD COLUMN point.z double FIRST
ALTER TABLE ... RENAME COLUMN
¶
ALTER TABLE prod.db.sample RENAME COLUMN data TO payload
ALTER TABLE prod.db.sample RENAME COLUMN location.lat TO latitude
Note that nested rename commands only rename the leaf field. The above command renames location.lat
to location.latitude
ALTER TABLE ... ALTER COLUMN
¶
Alter column is used to widen types, make a field optional, set comments, and reorder fields.
ALTER TABLE prod.db.sample ALTER COLUMN id DROP NOT NULL
ALTER TABLE prod.db.sample ALTER COLUMN location.lat TYPE double
ALTER TABLE prod.db.sample ALTER COLUMN point.z AFTER y
ALTER TABLE prod.db.sample ALTER COLUMN id COMMENT 'unique id'
ALTER TABLE ... DROP COLUMN
¶
ALTER TABLE prod.db.sample DROP COLUMN id
ALTER TABLE prod.db.sample DROP COLUMN point.z
DROP TABLE
¶
To delete a table, run:
DROP TABLE prod.db.sample
Querying with SQL¶
In Spark 3, tables use identifiers that include a catalog name.
SELECT * FROM prod.db.table -- catalog: prod, namespace: db, table: table
Metadata tables, like history
and snapshots
, can use the Iceberg table name as a namespace.
For example, to read from the files
metadata table for prod.db.table
, run:
SELECT * FROM prod.db.table.files
Querying with DataFrames¶
To load a table as a DataFrame, use table
:
val df = spark.table("prod.db.table")
Warning
When reading with DataFrames in Spark 3, use table
to load a table by name from a catalog unless option
is also required.
Using format("iceberg")
loads an isolated table reference that is not refreshed when other queries update the table.
Time travel¶
To select a specific table snapshot or the snapshot at some time, Iceberg supports two Spark read options:
snapshot-id
selects a specific table snapshotas-of-timestamp
selects the current snapshot at a timestamp, in milliseconds
// time travel to October 26, 1986 at 01:21:00
spark.read
.option("as-of-timestamp", "499162860000")
.format("iceberg")
.load("path/to/table")
// time travel to snapshot with ID 10963874102873L
spark.read
.option("snapshot-id", 10963874102873L)
.format("iceberg")
.load("path/to/table")
Note
Spark does not currently support using option
with table
in DataFrameReader commands. All options will be silently
ignored. Do not use table
when attempting to time-travel or use other options. Options will be supported with table
in Spark 3.1 - SPARK-32592.
Time travel is not yet supported by Spark’s SQL syntax.
Spark 2.4¶
Spark 2.4 requires using the DataFrame reader with iceberg
as a format, because 2.4 does not support catalogs:
// named metastore table
spark.read.format("iceberg").load("db.table")
// Hadoop path table
spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table")
Spark 2.4 with SQL¶
To run SQL SELECT
statements on Iceberg tables in 2.4, register the DataFrame as a temporary table:
val df = spark.read.format("iceberg").load("db.table")
df.createOrReplaceTempView("table")
spark.sql("""select count(1) from table""").show()
Writing with SQL¶
Spark 3 supports SQL INSERT INTO
and INSERT OVERWRITE
, as well as the new DataFrameWriterV2
API.
INSERT INTO
¶
To append new data to a table, use INSERT INTO
.
INSERT INTO prod.db.table VALUES (1, 'a'), (2, 'b')
INSERT INTO prod.db.table SELECT ...
INSERT OVERWRITE
¶
To replace data in the table with the result of a query, use INSERT OVERWRITE
. Overwrites are atomic operations for Iceberg tables.
The partitions that will be replaced by INSERT OVERWRITE
depends on Spark’s partition overwrite mode and the partitioning of a table.
Warning
Spark 3.0.0 has a correctness bug that affects dynamic INSERT OVERWRITE
with hidden partitioning, SPARK-32168.
For tables with hidden partitions, wait for Spark 3.0.1.
Overwrite behavior¶
Spark’s default overwrite mode is static, but dynamic overwrite mode is recommended when writing to Iceberg tables. Static overwrite mode determines which partitions to overwrite in a table by converting the PARTITION
clause to a filter, but the PARTITION
clause can only reference table columns.
Dynamic overwrite mode is configured by setting spark.sql.sources.partitionOverwriteMode=dynamic
.
To demonstrate the behavior of dynamic and static overwrites, consider a logs
table defined by the following DDL:
CREATE TABLE prod.my_app.logs (
uuid string NOT NULL,
level string NOT NULL,
ts timestamp NOT NULL,
message string)
USING iceberg
PARTITIONED BY (level, hours(ts))
Dynamic overwrite¶
When Spark’s overwrite mode is dynamic, partitions that have rows produced by the SELECT
query will be replaced.
For example, this query removes duplicate log events from the example logs
table.
INSERT OVERWRITE prod.my_app.logs
SELECT uuid, first(level), first(ts), first(message)
FROM prod.my_app.logs
WHERE cast(ts as date) = '2020-07-01'
GROUP BY uuid
In dynamic mode, this will replace any partition with rows in the SELECT
result. Because the date of all rows is restricted to 1 July, only hours of that day will be replaced.
Static overwrite¶
When Spark’s overwrite mode is static, the PARTITION
clause is converted to a filter that is used to delete from the table. If the PARTITION
clause is omitted, all partitions will be replaced.
Because there is no PARTITION
clause in the query above, it will drop all existing rows in the table when run in static mode, but will only write the logs from 1 July.
To overwrite just the partitions that were loaded, add a PARTITION
clause that aligns with the SELECT
query filter:
INSERT OVERWRITE prod.my_app.logs
PARTITION (level = 'INFO')
SELECT uuid, first(level), first(ts), first(message)
FROM prod.my_app.logs
WHERE level = 'INFO'
GROUP BY uuid
Note that this mode cannot replace hourly partitions like the dynamic example query because the PARTITION
clause can only reference table columns, not hidden partitions.
DELETE FROM
¶
Spark 3 added support for DELETE FROM
queries to remove data from tables.
Delete queries accept a filter to match rows to delete. Iceberg can delete data as long as the filter matches entire partitions of the table, or it can determine that all rows of a file match. If a file contains some rows that should be deleted and some that should not, Iceberg will throw an exception.
DELETE FROM prod.db.table
WHERE ts >= '2020-05-01 00:00:00' and ts < '2020-06-01 00:00:00'
Writing with DataFrames¶
Spark 3 introduced the new DataFrameWriterV2
API for writing to tables using data frames. The v2 API is recommended for several reasons:
- CTAS, RTAS, and overwrite by filter are supported
- All operations consistently write columns to a table by name
- Hidden partition expressions are supported in
partitionedBy
- Overwrite behavior is explicit, either dynamic or by a user-supplied filter
- The behavior of each operation corresponds to SQL statements
df.writeTo(t).create()
is equivalent toCREATE TABLE AS SELECT
df.writeTo(t).replace()
is equivalent toREPLACE TABLE AS SELECT
df.writeTo(t).append()
is equivalent toINSERT INTO
df.writeTo(t).overwritePartitions()
is equivalent to dynamicINSERT OVERWRITE
The v1 DataFrame write
API is still supported, but is not recommended.
Warning
When writing with the v1 DataFrame API in Spark 3, use saveAsTable
or insertInto
to load tables with a catalog.
Using format("iceberg")
loads an isolated table reference that will not automatically refresh tables used by queries.
Appending data¶
To append a dataframe to an Iceberg table, use append
:
val data: DataFrame = ...
data.writeTo("prod.db.table").append()
Spark 2.4¶
In Spark 2.4, use the v1 API with append
mode and iceberg
format:
data.write
.format("iceberg")
.mode("append")
.save("db.table")
Overwriting data¶
To overwrite partitions dynamically, use overwritePartitions()
:
val data: DataFrame = ...
data.writeTo("prod.db.table").overwritePartitions()
To explicitly overwrite partitions, use overwrite
to supply a filter:
data.writeTo("prod.db.table").overwrite($"level" === "INFO")
Spark 2.4¶
In Spark 2.4, overwrite values in an Iceberg table with overwrite
mode and iceberg
format:
data.write
.format("iceberg")
.mode("overwrite")
.save("db.table")
Warning
The behavior of overwrite mode changed between Spark 2.4 and Spark 3.
The behavior of DataFrameWriter overwrite mode was undefined in Spark 2.4, but is required to overwrite the entire table in Spark 3. Because of this new requirement, the Iceberg source’s behavior changed in Spark 3. In Spark 2.4, the behavior was to dynamically overwrite partitions. To use the Spark 2.4 behavior, add option overwrite-mode=dynamic
.
Creating tables¶
To run a CTAS or RTAS, use create
, replace
, or createOrReplace
operations:
val data: DataFrame = ...
data.writeTo("prod.db.table").create()
Create and replace operations support table configuration methods, like partitionedBy
and tableProperty
:
data.writeTo("prod.db.table")
.tableProperty("write.format.default", "orc")
.partitionBy($"level", days($"ts"))
.createOrReplace()
Writing against partitioned table¶
Iceberg requires the data to be sorted according to the partition spec per task (Spark partition) in prior to write against partitioned table. This applies both Writing with SQL and Writing with DataFrames.
Note
Explicit sort is necessary because Spark doesn’t allow Iceberg to request a sort before writing as of Spark 3.0. SPARK-23889 is filed to enable Iceberg to require specific distribution & sort order to Spark.
Note
Both global sort (orderBy
/sort
) and local sort (sortWithinPartitions
) work for the requirement.
Let’s go through writing the data against below sample table:
CREATE TABLE prod.db.sample (
id bigint,
data string,
category string,
ts timestamp)
USING iceberg
PARTITIONED BY (days(ts), category)
To write data to the sample table, your data needs to be sorted by days(ts), category
.
If you’re inserting data with SQL statement, you can use ORDER BY
to achieve it, like below:
INSERT INTO prod.db.sample
SELECT id, data, category, ts FROM another_table
ORDER BY ts, category
If you’re inserting data with DataFrame, you can use either orderBy
/sort
to trigger global sort, or sortWithinPartitions
to trigger local sort. Local sort for example:
data.sortWithinPartitions("ts", "category")
.writeTo("prod.db.sample")
.append()
You can simply add the original column to the sort condition for the most partition transformations, except bucket
.
For bucket
partition transformation, you need to register the Iceberg transform function in Spark to specify it during sort.
Let’s go through another sample table having bucket partition:
CREATE TABLE prod.db.sample (
id bigint,
data string,
category string,
ts timestamp)
USING iceberg
PARTITIONED BY (bucket(16, id))
You need to register the function to deal with bucket, like below:
import org.apache.iceberg.spark.IcebergSpark
import org.apache.spark.sql.types.DataTypes
IcebergSpark.registerBucketUDF(spark, "iceberg_bucket16", DataTypes.LongType, 16)
Note
Explicit registration of the function is necessary because Spark doesn’t allow Iceberg to provide functions. SPARK-27658 is filed to enable Iceberg to provide functions which can be used in query.
Here we just registered the bucket function as iceberg_bucket16
, which can be used in sort clause.
If you’re inserting data with SQL statement, you can use the function like below:
INSERT INTO prod.db.sample
SELECT id, data, category, ts FROM another_table
ORDER BY iceberg_bucket16(id)
If you’re inserting data with DataFrame, you can use the function like below:
data.sortWithinPartitions(expr("iceberg_bucket16(id)"))
.writeTo("prod.db.sample")
.append()
Inspecting tables¶
To inspect a table’s history, snapshots, and other metadata, Iceberg supports metadata tables.
Metadata tables are identified by adding the metadata table name after the original table name. For example, history for db.table
is read using db.table.history
.
Note
As of Spark 3.0, the format of the table name for inspection (catalog.database.table.metadata
) doesn’t work with Spark’s default catalog (spark_catalog
). If you’ve replaced the default catalog, you may want to use DataFrameReader API to inspect the table.
History¶
To show table history, run:
SELECT * FROM prod.db.table.history
+-------------------------+---------------------+---------------------+---------------------+
| made_current_at | snapshot_id | parent_id | is_current_ancestor |
+-------------------------+---------------------+---------------------+---------------------+
| 2019-02-08 03:29:51.215 | 5781947118336215154 | NULL | true |
| 2019-02-08 03:47:55.948 | 5179299526185056830 | 5781947118336215154 | true |
| 2019-02-09 16:24:30.13 | 296410040247533544 | 5179299526185056830 | false |
| 2019-02-09 16:32:47.336 | 2999875608062437330 | 5179299526185056830 | true |
| 2019-02-09 19:42:03.919 | 8924558786060583479 | 2999875608062437330 | true |
| 2019-02-09 19:49:16.343 | 6536733823181975045 | 8924558786060583479 | true |
+-------------------------+---------------------+---------------------+---------------------+
Note
This shows a commit that was rolled back. The example has two snapshots with the same parent, and one is not an ancestor of the current table state.
Snapshots¶
To show the valid snapshots for a table, run:
SELECT * FROM prod.db.table.snapshots
+-------------------------+----------------+-----------+-----------+----------------------------------------------------+-------------------------------------------------------+
| committed_at | snapshot_id | parent_id | operation | manifest_list | summary |
+-------------------------+----------------+-----------+-----------+----------------------------------------------------+-------------------------------------------------------+
| 2019-02-08 03:29:51.215 | 57897183625154 | null | append | s3://.../table/metadata/snap-57897183625154-1.avro | { added-records -> 2478404, total-records -> 2478404, |
| | | | | | added-data-files -> 438, total-data-files -> 438, |
| | | | | | spark.app.id -> application_1520379288616_155055 } |
| ... | ... | ... | ... | ... | ... |
+-------------------------+----------------+-----------+-----------+----------------------------------------------------+-------------------------------------------------------+
You can also join snapshots to table history. For example, this query will show table history, with the application ID that wrote each snapshot:
select
h.made_current_at,
s.operation,
h.snapshot_id,
h.is_current_ancestor,
s.summary['spark.app.id']
from prod.db.table.history h
join prod.db.table.snapshots s
on h.snapshot_id = s.snapshot_id
order by made_current_at
+-------------------------+-----------+----------------+---------------------+----------------------------------+
| made_current_at | operation | snapshot_id | is_current_ancestor | summary[spark.app.id] |
+-------------------------+-----------+----------------+---------------------+----------------------------------+
| 2019-02-08 03:29:51.215 | append | 57897183625154 | true | application_1520379288616_155055 |
| 2019-02-09 16:24:30.13 | delete | 29641004024753 | false | application_1520379288616_151109 |
| 2019-02-09 16:32:47.336 | append | 57897183625154 | true | application_1520379288616_155055 |
| 2019-02-08 03:47:55.948 | overwrite | 51792995261850 | true | application_1520379288616_152431 |
+-------------------------+-----------+----------------+---------------------+----------------------------------+
Manifests¶
To show a table’s file manifests and each file’s metadata, run:
SELECT * FROM prod.db.table.manifests
+----------------------------------------------------------------------+--------+-------------------+---------------------+------------------------+---------------------------+--------------------------+---------------------------------+
| path | length | partition_spec_id | added_snapshot_id | added_data_files_count | existing_data_files_count | deleted_data_files_count | partitions |
+----------------------------------------------------------------------+--------+-------------------+---------------------+------------------------+---------------------------+--------------------------+---------------------------------+
| s3://.../table/metadata/45b5290b-ee61-4788-b324-b1e2735c0e10-m0.avro | 4479 | 0 | 6668963634911763636 | 8 | 0 | 0 | [[false,2019-05-13,2019-05-15]] |
+----------------------------------------------------------------------+--------+-------------------+---------------------+------------------------+---------------------------+--------------------------+---------------------------------+
Files¶
To show a table’s data files and each file’s metadata, run:
SELECT * FROM prod.db.table.files
+-------------------------------------------------------------------------+-------------+--------------+--------------------+--------------------+------------------+-------------------+-----------------+-----------------+--------------+---------------+
| file_path | file_format | record_count | file_size_in_bytes | column_sizes | value_counts | null_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets |
+-------------------------------------------------------------------------+-------------+--------------+--------------------+--------------------+------------------+-------------------+-----------------+-----------------+--------------+---------------+
| s3:/.../table/data/00000-3-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet | PARQUET | 1 | 597 | [1 -> 90, 2 -> 62] | [1 -> 1, 2 -> 1] | [1 -> 0, 2 -> 0] | [1 -> , 2 -> c] | [1 -> , 2 -> c] | null | [4] |
| s3:/.../table/data/00001-4-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet | PARQUET | 1 | 597 | [1 -> 90, 2 -> 62] | [1 -> 1, 2 -> 1] | [1 -> 0, 2 -> 0] | [1 -> , 2 -> b] | [1 -> , 2 -> b] | null | [4] |
| s3:/.../table/data/00002-5-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet | PARQUET | 1 | 597 | [1 -> 90, 2 -> 62] | [1 -> 1, 2 -> 1] | [1 -> 0, 2 -> 0] | [1 -> , 2 -> a] | [1 -> , 2 -> a] | null | [4] |
+-------------------------------------------------------------------------+-------------+--------------+--------------------+--------------------+------------------+-------------------+-----------------+-----------------+--------------+---------------+
Inspecting with DataFrames¶
Metadata tables can be loaded in Spark 2.4 or Spark 3 using the DataFrameReader API:
// named metastore table
spark.read.format("iceberg").load("db.table.files").show(truncate = false)
// Hadoop path table
spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table#files").show(truncate = false)
Type compatibility¶
Spark and Iceberg support different set of types. Iceberg does the type conversion automatically, but not for all combinations, so you may want to understand the type conversion in Iceberg in prior to design the types of columns in your tables.
Spark type to Iceberg type¶
This type conversion table describes how Spark types are converted to the Iceberg types. The conversion applies on both creating Iceberg table and writing to Iceberg table via Spark.
Spark | Iceberg | Notes |
---|---|---|
boolean | boolean | |
short | integer | |
byte | integer | |
integer | integer | |
long | long | |
float | float | |
double | double | |
date | date | |
timestamp | timestamp with timezone | |
char | string | |
varchar | string | |
string | string | |
binary | binary | |
decimal | decimal | |
struct | struct | |
array | list | |
map | map |
Note
The table is based on representing conversion during creating table. In fact, broader supports are applied on write. Here’re some points on write:
- Iceberg numeric types (
integer
,long
,float
,double
,decimal
) support promotion during writes. e.g. You can write Spark typesshort
,byte
,integer
,long
to Iceberg typelong
. - You can write to Iceberg
fixed
type using Sparkbinary
type. Note that assertion on the length will be performed.
Iceberg type to Spark type¶
This type conversion table describes how Iceberg types are converted to the Spark types. The conversion applies on reading from Iceberg table via Spark.
Iceberg | Spark | Note |
---|---|---|
boolean | boolean | |
integer | integer | |
long | long | |
float | float | |
double | double | |
date | date | |
time | Not supported | |
timestamp with timezone | timestamp | |
timestamp without timezone | Not supported | |
string | string | |
uuid | string | |
fixed | binary | |
binary | binary | |
decimal | decimal | |
struct | struct | |
list | array | |
map | map |