Skip to content

Resources

dagster_pyiceberg.resource.IcebergTableResource

Resource for interacting with a PyIceberg table.

Examples:

from dagster import Definitions, asset
from dagster_pyiceberg import PyIcebergTableResource, LocalConfig

@asset
def my_table(pyiceberg_table: PyIcebergTableResource):
    df = pyiceberg_table.load().to_pandas()

defs = Definitions(
    assets=[my_table],
    resources={
        "pyiceberg_table,
        PyIcebergTableResource(
            name="mycatalog",
            namespace="mynamespace",
            table="mytable",
            config=IcebergCatalogConfig(properties={
                "uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
                "warehouse": f"file://{warehouse_path}",
            }),
        )
    }
)

Configuration

dagster_pyiceberg.config.IcebergCatalogConfig

Configuration for Iceberg Catalogs. See https://py.iceberg.apache.org/configuration/#catalogs for configuration options.

You can configure the PyIceberg IO manager:

1. Using a `.pyiceberg.yaml` configuration file.
2. Through environment variables.
3. Using the `IcebergCatalogConfig` configuration object.

For more information about the first two configuration options, see https://py.iceberg.apache.org/configuration/#setting-configuration-values

Example:

from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.io_manager.arrow import IcebergPyarrowIOManager

warehouse_path = "/path/to/warehouse

io_manager = IcebergPyarrowIOManager(
    name=catalog_name,
    config=IcebergCatalogConfig(properties={
        "uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
        "warehouse": f"file://{warehouse_path}",
    }),
    namespace=namespace,
)

IO Managers

dagster_pyiceberg.io_manager.arrow.IcebergPyarrowIOManager

An IO manager definition that reads inputs from and writes outputs to Iceberg tables using PyArrow.

Examples:

import pandas as pd
import pyarrow as pa
from dagster import Definitions, asset

from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.io_manager.arrow import IcebergPyarrowIOManager

CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
    "file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)


resources = {
    "io_manager": IcebergPyarrowIOManager(
        name="test",
        config=IcebergCatalogConfig(
            properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
        ),
        namespace="dagster",
    )
}


@asset
def iris_dataset() -> pa.Table:
    pa.Table.from_pandas(
        pd.read_csv(
            "https://docs.dagster.io/assets/iris.csv",
            names=[
                "sepal_length_cm",
                "sepal_width_cm",
                "petal_length_cm",
                "petal_width_cm",
                "species",
            ],
        )
    )


defs = Definitions(assets=[iris_dataset], resources=resources)

If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O Manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a "schema" entry in output metadata. If none of these is provided, the schema will default to "public". The I/O manager will check if the namespace exists in the iceberg catalog. It does not automatically create the namespace if it does not exist.

@op(
    out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
    ...

To only use specific columns of a table as input to a downstream op or asset, add the metadata "columns" to the In or AssetIn.

@asset(
    ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame):
    # my_table will just contain the data from column "a"
    ...

dagster_pyiceberg.io_manager.polars.IcebergPolarsIOManager

An IO manager definition that reads inputs from and writes outputs to Iceberg tables using Polars.

Examples:

import pandas as pd
import polars as pl
from dagster import Definitions, asset

from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.io_manager.polars import IcebergPolarsIOManager

CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
    "file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)


resources = {
    "io_manager": IcebergPolarsIOManager(
        name="test",
        config=IcebergCatalogConfig(
            properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
        ),
        namespace="dagster",
    )
}


@asset
def iris_dataset() -> pl.DataFrame:
    return pl.from_pandas(
        pd.read_csv(
            "https://docs.dagster.io/assets/iris.csv",
            names=[
                "sepal_length_cm",
                "sepal_width_cm",
                "petal_length_cm",
                "petal_width_cm",
                "species",
            ],
        )
    )


defs = Definitions(assets=[iris_dataset], resources=resources)

If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O Manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a "schema" entry in output metadata. If none of these is provided, the schema will default to "public". The I/O manager will check if the namespace exists in the iceberg catalog. It does not automatically create the namespace if it does not exist.

@op(
    out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
    ...

To only use specific columns of a table as input to a downstream op or asset, add the metadata "columns" to the In or AssetIn.

@asset(
    ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame):
    # my_table will just contain the data from column "a"
    ...

dagster_pyiceberg.io_manager.daft.IcebergDaftIOManager

An IO manager definition that reads inputs from and writes outputs to Iceberg tables using Daft.

Examples:

import daft as da
import pandas as pd
from dagster import Definitions, asset

from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.io_manager.daft import IcebergDaftIOManager

CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
    "file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)


resources = {
    "io_manager": IcebergDaftIOManager(
        name="test",
        config=IcebergCatalogConfig(
            properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
        ),
        namespace="dagster",
    )
}


@asset
def iris_dataset() -> da.DataFrame:
    return da.from_pandas(
        pd.read_csv(
            "https://docs.dagster.io/assets/iris.csv",
            names=[
                "sepal_length_cm",
                "sepal_width_cm",
                "petal_length_cm",
                "petal_width_cm",
                "species",
            ],
        )
    )


defs = Definitions(assets=[iris_dataset], resources=resources)

If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O Manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a "schema" entry in output metadata. If none of these is provided, the schema will default to "public". The I/O manager will check if the namespace exists in the iceberg catalog. It does not automatically create the namespace if it does not exist.

@op(
    out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
    ...

To only use specific columns of a table as input to a downstream op or asset, add the metadata "columns" to the In or AssetIn.

@asset(
    ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame):
    # my_table will just contain the data from column "a"
    ...

dagster_pyiceberg.io_manager.pandas.IcebergPandasIOManager

An IO manager definition that reads inputs from and writes outputs to Iceberg tables using Pandas.

Examples:

import pandas as pd
from dagster import Definitions, asset

from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.io_manager.pandas import IcebergPandasIOManager

CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
    "file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)


resources = {
    "io_manager": IcebergPandasIOManager(
        name="test",
        config=IcebergCatalogConfig(
            properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
        ),
        namespace="dagster",
    )
}


@asset
def iris_dataset() -> pd.DataFrame:
    return pd.read_csv(
        "https://docs.dagster.io/assets/iris.csv",
        names=[
            "sepal_length_cm",
            "sepal_width_cm",
            "petal_length_cm",
            "petal_width_cm",
            "species",
        ],
    )


defs = Definitions(assets=[iris_dataset], resources=resources)

If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O Manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a "schema" entry in output metadata. If none of these is provided, the schema will default to "public". The I/O manager will check if the namespace exists in the iceberg catalog. It does not automatically create the namespace if it does not exist.

@op(
    out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
    ...

To only use specific columns of a table as input to a downstream op or asset, add the metadata "columns" to the In or AssetIn.

@asset(
    ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame):
    # my_table will just contain the data from column "a"
    ...

Base classes

dagster_pyiceberg.handler.IcebergBaseTypeHandler

Methods:

  • handle_output

    Stores pyarrow types in Iceberg table

  • load_input

    Loads the input using a dataframe implementation

handle_output

handle_output(context: OutputContext, table_slice: TableSlice, obj: U, connection: Catalog)

Stores pyarrow types in Iceberg table

Source code in src/dagster_pyiceberg/handler.py
def handle_output(
    self,
    context: OutputContext,
    table_slice: TableSlice,
    obj: U,
    connection: Catalog,
):
    """Stores pyarrow types in Iceberg table"""
    metadata = context.definition_metadata or {}

    table_properties_usr = metadata.get("table_properties", {})
    partition_spec_update_mode = metadata.get("partition_spec_update_mode", "error")
    schema_update_mode = metadata.get("schema_update_mode", "error")

    table_writer(
        table_slice=table_slice,
        data=self.to_arrow(obj),
        catalog=connection,
        partition_spec_update_mode=partition_spec_update_mode,
        schema_update_mode=schema_update_mode,
        dagster_run_id=context.run_id,
        dagster_partition_key=(
            context.partition_key if context.has_asset_partitions else None
        ),
        table_properties=table_properties_usr,
    )

    table_ = connection.load_table(f"{table_slice.schema}.{table_slice.table}")

    current_snapshot = cast(Snapshot, table_.current_snapshot())

    context.add_output_metadata(
        {
            "table_columns": MetadataValue.table_schema(
                TableSchema(
                    columns=[
                        TableColumn(name=f["name"], type=str(f["type"]))
                        for f in table_.schema().model_dump()["fields"]
                    ]
                )
            ),
            **current_snapshot.model_dump(),
        }
    )

load_input

load_input(context: InputContext, table_slice: TableSlice, connection: Catalog) -> U

Loads the input using a dataframe implementation

Source code in src/dagster_pyiceberg/handler.py
def load_input(
    self,
    context: InputContext,
    table_slice: TableSlice,
    connection: Catalog,
) -> U:
    """Loads the input using a dataframe implementation"""
    return self.to_data_frame(
        table=connection.load_table(f"{table_slice.schema}.{table_slice.table}"),
        table_slice=table_slice,
        target_type=context.dagster_type.typing_type,
    )

dagster_pyiceberg.io_manager.base.IcebergIOManager

Base class for an IO manager definition that reads inputs from and writes outputs to Iceberg tables.

Examples:

import pandas as pd
import pyarrow as pa
from dagster import Definitions, asset

from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.io_manager.arrow import IcebergPyarrowIOManager

CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
    "file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)


resources = {
    "io_manager": IcebergPyarrowIOManager(
        name="test",
        config=IcebergCatalogConfig(
            properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
        ),
        schema="dagster",
    )
}


@asset
def iris_dataset() -> pa.Table:
    pa.Table.from_pandas(
        pd.read_csv(
            "https://docs.dagster.io/assets/iris.csv",
            names=[
                "sepal_length_cm",
                "sepal_width_cm",
                "petal_length_cm",
                "petal_width_cm",
                "species",
            ],
        )
    )


defs = Definitions(assets=[iris_dataset], resources=resources)

If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O Manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a "schema" entry in output metadata. If none of these is provided, the schema will default to "public". The I/O manager will check if the namespace exists in the iceberg catalog. It does not automatically create the namespace if it does not exist.

@op(
    out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
    ...

To only use specific columns of a table as input to a downstream op or asset, add the metadata "columns" to the In or AssetIn.

@asset(
    ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame):
    # my_table will just contain the data from column "a"
    ...