Skip to content

Daft🔗

Daft is a distributed query engine written in Python and Rust, two fast-growing ecosystems in the data engineering and machine learning industry.

It exposes its flavor of the familiar Python DataFrame API which is a common abstraction over querying tables of data in the Python data ecosystem.

Daft DataFrames are a powerful interface to power use-cases across ML/AI training, batch inference, feature engineering and traditional analytics. Daft's tight integration with Iceberg unlocks novel capabilities for both traditional analytics and Pythonic ML workloads on your data catalog.

Enabling Iceberg support in Daft🔗

PyIceberg supports reading of Iceberg tables into Daft DataFrames.

To use Iceberg with Daft, ensure that the PyIceberg library is also installed in your current Python environment.

pip install getdaft pyiceberg

Querying Iceberg using Daft🔗

Daft interacts natively with PyIceberg to read Iceberg tables.

Reading Iceberg tables🔗

Setup Steps

To follow along with this code, first create an Iceberg table following the Spark Quickstart tutorial. PyIceberg must then be correctly configured by ensuring that the ~/.pyiceberg.yaml file contains an appropriate catalog entry:

catalog:
  default:
    # URL to the Iceberg REST server Docker container
    uri: http://localhost:8181
    # URL and credentials for the MinIO Docker container
    s3.endpoint: http://localhost:9000
    s3.access-key-id: admin
    s3.secret-access-key: password

Here is how the Iceberg table demo.nyc.taxis can be loaded into Daft:

import daft
from pyiceberg.catalog import load_catalog

# Configure Daft to use the local MinIO Docker container for any S3 operations
daft.set_planning_config(
    default_io_config=daft.io.IOConfig(
        s3=daft.io.S3Config(endpoint_url="http://localhost:9000"),
    )
)

# Load a PyIceberg table into Daft, and show the first few rows
table = load_catalog("default").load_table("nyc.taxis")
df = daft.read_iceberg(table)
df.show()
╭───────────┬─────────┬───────────────┬─────────────┬────────────────────╮
│ vendor_id ┆ trip_id ┆ trip_distance ┆ fare_amount ┆ store_and_fwd_flag │
│ ---       ┆ ---     ┆ ---           ┆ ---         ┆ ---                │
│ Int64     ┆ Int64   ┆ Float32       ┆ Float64     ┆ Utf8               │
╞═══════════╪═════════╪═══════════════╪═════════════╪════════════════════╡
│ 1         ┆ 1000371 ┆ 1.8           ┆ 15.32       ┆ N                  │
├╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 1         ┆ 1000374 ┆ 8.4           ┆ 42.13       ┆ Y                  │
├╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2         ┆ 1000372 ┆ 2.5           ┆ 22.15       ┆ N                  │
├╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2         ┆ 1000373 ┆ 0.9           ┆ 9.01        ┆ N                  │
╰───────────┴─────────┴───────────────┴─────────────┴────────────────────╯

(Showing first 4 of 4 rows)

Note that the operation above will produce a warning from PyIceberg that "no partition filter was specified" and that "this will result in a full table scan". Any filter operations on the Daft dataframe, df, will push down the filters, correctly account for hidden partitioning, and utilize table statistics to inform query planning for efficient reads.

Let's try the above query again, but this time with a filter applied on the table's partition column "vendor_id" which Daft will correctly use to elide a full table scan.

df = df.where(df["vendor_id"] > 1)
df.show()
╭───────────┬─────────┬───────────────┬─────────────┬────────────────────╮
│ vendor_id ┆ trip_id ┆ trip_distance ┆ fare_amount ┆ store_and_fwd_flag │                                                          
│ ---       ┆ ---     ┆ ---           ┆ ---         ┆ ---                │
│ Int64     ┆ Int64   ┆ Float32       ┆ Float64     ┆ Utf8               │
╞═══════════╪═════════╪═══════════════╪═════════════╪════════════════════╡
│ 2         ┆ 1000372 ┆ 2.5           ┆ 22.15       ┆ N                  │
├╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2         ┆ 1000373 ┆ 0.9           ┆ 9.01        ┆ N                  │
╰───────────┴─────────┴───────────────┴─────────────┴────────────────────╯

(Showing first 2 of 2 rows)

Type compatibility🔗

Daft and Iceberg have compatible type systems. Here are how types are converted across the two systems.

Iceberg Daft
Primitive Types
boolean daft.DataType.bool()
int daft.DataType.int32()
long daft.DataType.int64()
float daft.DataType.float32()
double daft.DataType.float64()
decimal(precision, scale) daft.DataType.decimal128(precision, scale)
date daft.DataType.date()
time daft.DataType.time(timeunit="us")
timestamp daft.DataType.timestamp(timeunit="us", timezone=None)
timestampz daft.DataType.timestamp(timeunit="us", timezone="UTC")
string daft.DataType.string()
uuid daft.DataType.binary()
fixed(L) daft.DataType.binary()
binary daft.DataType.binary()
Nested Types
struct(**fields) daft.DataType.struct(**fields)
list(child_type) daft.DataType.list(child_type)
map(K, V) daft.DataType.map(K, V)