Skip to content

Quickstart

Iceberg catalog

PyIceberg requires a catalog backend. A SQLite catalog is used here for illustrative purposes. Do not use this in a production setting.

Step 1: Defining the I/O manager

To use dagster-pyiceberg as an I/O manager, you add it to your Definition:

from dagster import Definitions
from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.io_manager.arrow import IcebergPyarrowIOManager

CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/dag/warehouse/catalog.db"
CATALOG_WAREHOUSE = "file:///home/vscode/workspace/.tmp/dag/warehouse"


resources = {
    "io_manager": IcebergPyarrowIOManager(
        name="test",
        config=IcebergCatalogConfig(
            properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
        ),
        namespace="dagster",
    )
}


defs = Definitions(
    assets=[iris_dataset],
    resources=resources
)

Step 2: Store a Dagster asset as a PyIceberg table

import pandas as pd

from dagster import asset


@asset
def iris_dataset() -> pd.DataFrame:
    return pd.read_csv(
        "https://docs.dagster.io/assets/iris.csv",
        names=[
            "sepal_length_cm",
            "sepal_width_cm",
            "petal_length_cm",
            "petal_width_cm",
            "species",
        ],
    )

Step 3: Load PyIceberg tables in downstream assets

Dagster and the I/O manager allow you to load the data stored in Iceberg tables into downstream assets:

1
2
3
4
5
6
7
8
9
import pandas as pd

from dagster import asset

# this example uses the iris_dataset asset from Step 2

@asset
def iris_cleaned(iris_dataset: pd.DataFrame) -> pd.DataFrame:
    return iris_dataset.dropna().drop_duplicates()