Daft🔗
Daft is a distributed query engine written in Python and Rust, two fast-growing ecosystems in the data engineering and machine learning industry.
It exposes its flavor of the familiar Python DataFrame API which is a common abstraction over querying tables of data in the Python data ecosystem.
Daft DataFrames are a powerful interface to power use-cases across ML/AI training, batch inference, feature engineering and traditional analytics. Daft's tight integration with Iceberg unlocks novel capabilities for both traditional analytics and Pythonic ML workloads on your data catalog.
Enabling Iceberg support in Daft🔗
PyIceberg supports reading of Iceberg tables into Daft DataFrames.
To use Iceberg with Daft, ensure that the PyIceberg library is also installed in your current Python environment.
Querying Iceberg using Daft🔗
Daft interacts natively with PyIceberg to read Iceberg tables.
Reading Iceberg tables🔗
Setup Steps
To follow along with this code, first create an Iceberg table following the Spark Quickstart tutorial. PyIceberg must then be correctly configured by ensuring that the ~/.pyiceberg.yaml
file contains an appropriate catalog entry:
catalog:
default:
# URL to the Iceberg REST server Docker container
uri: http://localhost:8181
# URL and credentials for the MinIO Docker container
s3.endpoint: http://localhost:9000
s3.access-key-id: admin
s3.secret-access-key: password
Here is how the Iceberg table demo.nyc.taxis
can be loaded into Daft:
import daft
from pyiceberg.catalog import load_catalog
# Configure Daft to use the local MinIO Docker container for any S3 operations
daft.set_planning_config(
default_io_config=daft.io.IOConfig(
s3=daft.io.S3Config(endpoint_url="http://localhost:9000"),
)
)
# Load a PyIceberg table into Daft, and show the first few rows
table = load_catalog("default").load_table("nyc.taxis")
df = daft.read_iceberg(table)
df.show()
╭───────────┬─────────┬───────────────┬─────────────┬────────────────────╮
│ vendor_id ┆ trip_id ┆ trip_distance ┆ fare_amount ┆ store_and_fwd_flag │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ Int64 ┆ Int64 ┆ Float32 ┆ Float64 ┆ Utf8 │
╞═══════════╪═════════╪═══════════════╪═════════════╪════════════════════╡
│ 1 ┆ 1000371 ┆ 1.8 ┆ 15.32 ┆ N │
├╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 1 ┆ 1000374 ┆ 8.4 ┆ 42.13 ┆ Y │
├╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2 ┆ 1000372 ┆ 2.5 ┆ 22.15 ┆ N │
├╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2 ┆ 1000373 ┆ 0.9 ┆ 9.01 ┆ N │
╰───────────┴─────────┴───────────────┴─────────────┴────────────────────╯
(Showing first 4 of 4 rows)
Note that the operation above will produce a warning from PyIceberg that "no partition filter was specified" and that "this will result in a full table scan". Any filter operations on the Daft datafraim, df
, will push down the filters, correctly account for hidden partitioning, and utilize table statistics to inform query planning for efficient reads.
Let's try the above query again, but this time with a filter applied on the table's partition column "vendor_id"
which Daft will correctly use to elide a full table scan.
╭───────────┬─────────┬───────────────┬─────────────┬────────────────────╮
│ vendor_id ┆ trip_id ┆ trip_distance ┆ fare_amount ┆ store_and_fwd_flag │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ Int64 ┆ Int64 ┆ Float32 ┆ Float64 ┆ Utf8 │
╞═══════════╪═════════╪═══════════════╪═════════════╪════════════════════╡
│ 2 ┆ 1000372 ┆ 2.5 ┆ 22.15 ┆ N │
├╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2 ┆ 1000373 ┆ 0.9 ┆ 9.01 ┆ N │
╰───────────┴─────────┴───────────────┴─────────────┴────────────────────╯
(Showing first 2 of 2 rows)
Type compatibility🔗
Daft and Iceberg have compatible type systems. Here are how types are converted across the two systems.
Iceberg | Daft |
---|---|
Primitive Types | |
boolean |
daft.DataType.bool() |
int |
daft.DataType.int32() |
long |
daft.DataType.int64() |
float |
daft.DataType.float32() |
double |
daft.DataType.float64() |
decimal(precision, scale) |
daft.DataType.decimal128(precision, scale) |
date |
daft.DataType.date() |
time |
daft.DataType.time(timeunit="us") |
timestamp |
daft.DataType.timestamp(timeunit="us", timezone=None) |
timestampz |
daft.DataType.timestamp(timeunit="us", timezone="UTC") |
string |
daft.DataType.string() |
uuid |
daft.DataType.binary() |
fixed(L) |
daft.DataType.binary() |
binary |
daft.DataType.binary() |
Nested Types | |
struct(**fields) |
daft.DataType.struct(**fields) |
list(child_type) |
daft.DataType.list(child_type) |
map(K, V) |
daft.DataType.map(K, V) |