Skip to content

dagster-pyiceberg integration reference

This reference page provides information for working with dagster-pyiceberg.

Iceberg catalog

PyIceberg requires a catalog backend. A SQLite catalog is used here for illustrative purposes. Do not use this in a production setting. For more information and for catalog configuration settings, visit the PyIceberg documentation.

Selecting specific columns in a downstream asset

At times, you might prefer not to retrieve an entire table for a downstream asset. The PyIceberg I/O manager allows you to load specific columns by providing metadata related to the downstream asset.

docs/snippets/select_columns.py
import pandas as pd
from dagster import AssetIn, Definitions, asset

from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.io_manager.pandas import IcebergPandasIOManager

CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
    "file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)


resources = {
    "io_manager": IcebergPandasIOManager(
        name="test",
        config=IcebergCatalogConfig(
            properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
        ),
        namespace="dagster",
    )
}


@asset
def iris_dataset() -> pd.DataFrame:
    return pd.read_csv(
        "https://docs.dagster.io/assets/iris.csv",
        names=[
            "sepal_length_cm",
            "sepal_width_cm",
            "petal_length_cm",
            "petal_width_cm",
            "species",
        ],
    )


@asset(
    ins={
        "iris_sepal": AssetIn(
            key="iris_dataset",
            metadata={"columns": ["sepal_length_cm", "sepal_width_cm"]},
        )
    }
)
def sepal_data(iris_sepal: pd.DataFrame) -> pd.DataFrame:
    iris_sepal["sepal_area_cm2"] = (
        iris_sepal["sepal_length_cm"] * iris_sepal["sepal_width_cm"]
    )
    return iris_sepal


defs = Definitions(assets=[iris_dataset, sepal_data], resources=resources)

In this example, we focus exclusively on the columns containing sepal data from the iris_dataset table. To select specific columns, we can include metadata in the input asset. This is done using the metadata parameter of the AssetIn that loads the iris_dataset asset within the ins parameter. We provide the key columns along with a list of the desired column names.

When Dagster materializes sepal_data and retrieves the iris_dataset asset via the PyIceberg I/O manager, it will only extract the sepal_length_cm and sepal_width_cm columns from the iris/iris_dataset table and deliver them to sepal_data as a Pandas DataFrame.


Storing partitioned assets

The PyIceberg I/O manager facilitates the storage and retrieval of partitioned data. To effectively manage data in the Iceberg table, it is essential for the PyIceberg I/O manager to identify the column that specifies the partition boundaries. This information allows the I/O manager to formulate the appropriate queries for selecting or replacing data.

In the subsequent sections, we will outline how the I/O manager generates these queries for various partition types.

Partition dimensions

For partitioning to function correctly, the partition dimension must correspond to one of the partition columns defined in the Iceberg table. Tables created through the I/O manager will be set up accordingly.

To save static partitioned assets in your Iceberg table, you need to set the partition_expr metadata on the asset. This informs the PyIceberg I/O manager which column holds the partition data:

docs/snippets/partitions_static.py
import pandas as pd
from dagster import Definitions, StaticPartitionsDefinition, asset

from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.io_manager.pandas import IcebergPandasIOManager

CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/catalog.db"
CATALOG_WAREHOUSE = "file:///home/vscode/workspace/.tmp/examples/warehouse"


resources = {
    "io_manager": IcebergPandasIOManager(
        name="test",
        config=IcebergCatalogConfig(
            properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
        ),
        namespace="dagster",
    )
}


@asset(
    partitions_def=StaticPartitionsDefinition(
        ["Iris-setosa", "Iris-virginica", "Iris-versicolor"]
    ),
    metadata={"partition_expr": "species"},
)
def iris_dataset_partitioned(context) -> pd.DataFrame:
    species = context.partition_key

    full_df = pd.read_csv(
        "https://docs.dagster.io/assets/iris.csv",
        names=[
            "sepal_length_cm",
            "sepal_width_cm",
            "petal_length_cm",
            "petal_width_cm",
            "species",
        ],
    )

    return full_df[full_df["species"] == species]


@asset
def iris_cleaned(iris_dataset_partitioned: pd.DataFrame):
    return iris_dataset_partitioned.dropna().drop_duplicates()


defs = Definitions(assets=[iris_dataset_partitioned, iris_cleaned], resources=resources)

Dagster uses the partition_expr metadata to create the necessary function parameters when retrieving the partition in the downstream asset. For static partitions, this is roughly equivalent to the following SQL query:

SELECT *
WHERE [partition_expr] in ([selected partitions])

A partition must be specified when materializing the above assets, as explained in the Materializing partitioned assets documentation. For instance, the query used to materialize the Iris-setosa partition of the assets would be:

SELECT *
WHERE species = 'Iris-setosa'

Like static partitioned assets, you can specify partition_expr metadata on the asset to tell the PyIceberg I/O manager which column contains the partition data:

docs/snippets/partitions_time.py
import datetime as dt
import random

import pandas as pd
from dagster import DailyPartitionsDefinition, Definitions, asset

from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.io_manager.pandas import IcebergPandasIOManager

CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/catalog.db"
CATALOG_WAREHOUSE = "file:///home/vscode/workspace/.tmp/examples/warehouse"


resources = {
    "io_manager": IcebergPandasIOManager(
        name="test",
        config=IcebergCatalogConfig(
            properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
        ),
        namespace="dagster",
    )
}


def get_iris_data_for_date(partition: str) -> pd.DataFrame:
    random.seed(876)
    N = 1440
    d = {
        "timestamp": [dt.date.fromisoformat(partition)],
        "species": [
            random.choice(["Iris-setosa", "Iris-virginica", "Iris-versicolor"])
            for _ in range(N)
        ],
        "sepal_length_cm": [random.uniform(0, 1) for _ in range(N)],
        "sepal_width_cm": [random.uniform(0, 1) for _ in range(N)],
        "petal_length_cm": [random.uniform(0, 1) for _ in range(N)],
        "petal_width_cm": [random.uniform(0, 1) for _ in range(N)],
    }
    return pd.DataFrame.from_dict(d)


@asset(
    partitions_def=DailyPartitionsDefinition(start_date="2023-01-01"),
    metadata={"partition_expr": "time"},
)
def iris_data_per_day(context) -> pd.DataFrame:
    partition = context.partition_key

    # get_iris_data_for_date fetches all of the iris data for a given date,
    # the returned dataframe contains a column named 'time' with that stores
    # the time of the row as an integer of seconds since epoch
    return get_iris_data_for_date(partition)


@asset
def iris_cleaned(iris_data_per_day: pd.DataFrame):
    return iris_data_per_day.dropna().drop_duplicates()


defs = Definitions(assets=[iris_data_per_day, iris_cleaned], resources=resources)

Dagster uses the partition_expr metadata to craft the SELECT statement when loading the correct partition in the downstream asset. When loading a dynamic partition, the following statement is used:

SELECT *
WHERE [partition_expr] = [partition_start]

A partition must be selected when materializing the above assets, as described in the Materializing partitioned assets documentation. The [partition_start] and [partition_end] bounds are of the form YYYY-MM-DD HH:MM:SS. In this example, the query when materializing the 2023-01-02 partition of the above assets would be:

SELECT *
WHERE time = '2023-01-02 00:00:00'

The PyIceberg I/O manager can also store data partitioned on multiple dimensions. To do this, specify the column for each partition as a dictionary of partition_expr metadata:

docs/snippets/partitions_multiple.py
import datetime as dt
import random

import pandas as pd
from dagster import (
    DailyPartitionsDefinition,
    Definitions,
    MultiPartitionsDefinition,
    StaticPartitionDefinition,
    asset,
)

from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.io_manager.pandas import IcebergPandasIOManager

CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/catalog.db"
CATALOG_WAREHOUSE = "file:///home/vscode/workspace/.tmp/examples/warehouse"


resources = {
    "io_manager": IcebergPandasIOManager(
        name="test",
        config=IcebergCatalogConfig(
            properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
        ),
        namespace="dagster",
    )
}


def get_iris_data_for_date(partition: str) -> pd.DataFrame:
    random.seed(876)
    N = 1440
    d = {
        "timestamp": [dt.date.fromisoformat(partition)],
        "species": [
            random.choice(["Iris-setosa", "Iris-virginica", "Iris-versicolor"])
            for _ in range(N)
        ],
        "sepal_length_cm": [random.uniform(0, 1) for _ in range(N)],
        "sepal_width_cm": [random.uniform(0, 1) for _ in range(N)],
        "petal_length_cm": [random.uniform(0, 1) for _ in range(N)],
        "petal_width_cm": [random.uniform(0, 1) for _ in range(N)],
    }
    return pd.DataFrame.from_dict(d)


@asset(
    partitions_def=MultiPartitionsDefinition(
        {
            "date": DailyPartitionsDefinition(start_date="2023-01-01"),
            "species": StaticPartitionDefinition(
                ["Iris-setosa", "Iris-virginica", "Iris-versicolor"]
            ),
        }
    ),
    metadata={"partition_expr": {"date": "time", "species": "species"}},
)
def iris_dataset_partitioned(context) -> pd.DataFrame:
    partition = context.partition_key.keys_by_dimension
    species = partition["species"]
    date = partition["date"]

    # get_iris_data_for_date fetches all of the iris data for a given date,
    # the returned dataframe contains a column named 'time' with that stores
    # the time of the row as an integer of seconds since epoch
    full_df = get_iris_data_for_date(date)

    return full_df[full_df["species"] == species]


@asset
def iris_cleaned(iris_dataset_partitioned: pd.DataFrame):
    return iris_dataset_partitioned.dropna().drop_duplicates()


defs = Definitions(assets=[iris_dataset_partitioned, iris_cleaned], resources=resources)

Dagster uses the partition_expr metadata to craft the SELECT statement when loading the correct partition in a downstream asset. For multi-partitions, Dagster concatenates the WHERE statements described in the above sections to craft the correct SELECT statement.

A partition must be selected when materializing the above assets, as described in the Materializing partitioned assets documentation. For example, when materializing the 2023-01-02|Iris-setosa partition of the above assets, the following query will be used:

SELECT *
WHERE species = 'Iris-setosa'
  AND time = '2023-01-02 00:00:00'

Storing tables in multiple schemas

You may want to have different assets stored in different PyIceberg schemas. The PyIceberg I/O manager allows you to specify the schema in several ways.

If you want all of your assets to be stored in the same schema, you can specify the schema as configuration to the I/O manager.

If you want to store assets in different schemas, you can specify the schema as part of the asset's key:

docs/snippets/multiple_schemas.py
import pandas as pd
from dagster import Definitions, asset

from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.io_manager.pandas import IcebergPandasIOManager

CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/catalog.db"
CATALOG_WAREHOUSE = "file:///home/vscode/workspace/.tmp/examples/warehouse"


resources = {
    "io_manager": IcebergPandasIOManager(
        name="test",
        config=IcebergCatalogConfig(
            properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
        ),
        namespace="dagster",
    )
}


@asset(key_prefix=["iris"])  # will be stored in "iris" schema
def iris_dataset() -> pd.DataFrame:
    return pd.read_csv(
        "https://docs.dagster.io/assets/iris.csv",
        names=[
            "sepal_length_cm",
            "sepal_width_cm",
            "petal_length_cm",
            "petal_width_cm",
            "species",
        ],
    )


@asset(key_prefix=["wine"])  # will be stored in "wine" schema
def wine_dataset() -> pd.DataFrame:
    return pd.read_csv(
        "https://gist.githubusercontent.com/tijptjik/9408623/raw/b237fa5848349a14a14e5d4107dc7897c21951f5/wine.csv",
        names=[
            "sepal_length_cm",
            "sepal_width_cm",
            "petal_length_cm",
            "petal_width_cm",
            "species",
        ],
    )


defs = Definitions(assets=[iris_dataset, wine_dataset], resources=resources)

In this example, the iris_dataset asset will be stored in the IRIS schema, and the daffodil_dataset asset will be found in the DAFFODIL schema.

Specifying a schema

The two options for specifying schema are mutually exclusive. If you provide{" "} schema configuration to the I/O manager, you cannot also provide it via the asset key and vice versa. If no schema is provided, either from configuration or asset keys, the default schema{" "} public will be used.


Using the PyIceberg I/O manager with other I/O managers

You may have assets that you don't want to store in PyIceberg. You can provide an I/O manager to each asset using the io_manager_key parameter in the decorator:

docs/snippets/multiple_io_managers.py
import pandas as pd
from dagster import Definitions, FilesystemIOManager, asset

from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.io_manager.pandas import IcebergPandasIOManager

CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/catalog.db"
CATALOG_WAREHOUSE = "file:///home/vscode/workspace/.tmp/examples/warehouse"
FS_BASE_DIR = "/home/vscode/workspace/.tmp/examples/images"


resources = {
    "dwh_io_manager": IcebergPandasIOManager(
        name="test",
        config=IcebergCatalogConfig(
            properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
        ),
        namespace="dagster",
    ),
    "blob_io_manager": FilesystemIOManager(base_dir=FS_BASE_DIR),
}


@asset(io_manager_key="dwh_io_manager")
def iris_dataset() -> pd.DataFrame:
    return pd.read_csv(
        "https://docs.dagster.io/assets/iris.csv",
        names=[
            "sepal_length_cm",
            "sepal_width_cm",
            "petal_length_cm",
            "petal_width_cm",
            "species",
        ],
    )


@asset(io_manager_key="blob_io_manager")
def iris_plots(iris_dataset: pd.DataFrame):
    # plot_data is a function we've defined somewhere else
    # that plots the data in a DataFrame
    return iris_dataset["sepal_length_cm"].plot.hist()


defs = Definitions(assets=[iris_dataset, iris_plots], resources=resources)

In this example:

  • The iris_dataset asset uses the I/O manager bound to the key warehouse_io_manager and iris_plots uses the I/O manager bound to the key blob_io_manager
  • In the object, we supply the I/O managers for those keys
  • When the assets are materialized, the iris_dataset will be stored in PyIceberg, and iris_plots will be saved in Amazon S3

Storing and loading PyArrow, Pandas, or Polars DataFrames with PyIceberg

The PyIceberg I/O manager also supports storing and loading PyArrow and Polars DataFrames.

The pyiceberg package relies heavily on Apache Arrow for efficient data transfer, so PyArrow is natively supported.

You can use IcebergPyarrowIOManager to read and write iceberg tables:

docs/snippets/io_manager_pyarrow.py
import pandas as pd
import pyarrow as pa
from dagster import Definitions, asset

from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.io_manager.arrow import IcebergPyarrowIOManager

CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
    "file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)


resources = {
    "io_manager": IcebergPyarrowIOManager(
        name="test",
        config=IcebergCatalogConfig(
            properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
        ),
        namespace="dagster",
    )
}


@asset
def iris_dataset() -> pa.Table:
    pa.Table.from_pandas(
        pd.read_csv(
            "https://docs.dagster.io/assets/iris.csv",
            names=[
                "sepal_length_cm",
                "sepal_width_cm",
                "petal_length_cm",
                "petal_width_cm",
                "species",
            ],
        )
    )


defs = Definitions(assets=[iris_dataset], resources=resources)

You can use IcebergPandasIOManager to read and write iceberg tables using Pandas:

docs/snippets/io_manager_pandas.py
import pandas as pd
from dagster import Definitions, asset

from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.io_manager.pandas import IcebergPandasIOManager

CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
    "file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)


resources = {
    "io_manager": IcebergPandasIOManager(
        name="test",
        config=IcebergCatalogConfig(
            properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
        ),
        namespace="dagster",
    )
}


@asset
def iris_dataset() -> pd.DataFrame:
    return pd.read_csv(
        "https://docs.dagster.io/assets/iris.csv",
        names=[
            "sepal_length_cm",
            "sepal_width_cm",
            "petal_length_cm",
            "petal_width_cm",
            "species",
        ],
    )


defs = Definitions(assets=[iris_dataset], resources=resources)

You can use the IcebergPolarsIOManager to read and write iceberg tables using Polars using a full lazily optimized query engine:

docs/snippets/io_manager_polars.py
import pandas as pd
import polars as pl
from dagster import Definitions, asset

from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.io_manager.polars import IcebergPolarsIOManager

CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
    "file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)


resources = {
    "io_manager": IcebergPolarsIOManager(
        name="test",
        config=IcebergCatalogConfig(
            properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
        ),
        namespace="dagster",
    )
}


@asset
def iris_dataset() -> pl.DataFrame:
    return pl.from_pandas(
        pd.read_csv(
            "https://docs.dagster.io/assets/iris.csv",
            names=[
                "sepal_length_cm",
                "sepal_width_cm",
                "petal_length_cm",
                "petal_width_cm",
                "species",
            ],
        )
    )


defs = Definitions(assets=[iris_dataset], resources=resources)

You can use the IcebergDaftIOManager to read and write iceberg tables using Daft using a full lazily optimized query engine:

docs/snippets/io_manager_daft.py
import daft as da
import pandas as pd
from dagster import Definitions, asset

from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.io_manager.daft import IcebergDaftIOManager

CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
    "file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)


resources = {
    "io_manager": IcebergDaftIOManager(
        name="test",
        config=IcebergCatalogConfig(
            properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
        ),
        namespace="dagster",
    )
}


@asset
def iris_dataset() -> da.DataFrame:
    return da.from_pandas(
        pd.read_csv(
            "https://docs.dagster.io/assets/iris.csv",
            names=[
                "sepal_length_cm",
                "sepal_width_cm",
                "petal_length_cm",
                "petal_width_cm",
                "species",
            ],
        )
    )


defs = Definitions(assets=[iris_dataset], resources=resources)

Executing custom SQL commands with the PyIceberg resource

In addition to the PyIceberg I/O manager, Dagster also provides a PyIceberg resource for executing custom SQL queries.

docs/snippets/pyiceberg_resource.py
"""
NB: This snippet assumes that an iceberg table called 'ingested_data' exists.
"""

import pandas as pd
from dagster import Definitions, asset

from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.resource import IcebergTableResource

CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/catalog.db"
CATALOG_WAREHOUSE = "file:///home/vscode/workspace/.tmp/examples/warehouse"


@asset
def small_petals(iceberg: IcebergTableResource) -> pd.DataFrame:
    return iceberg.load().scan().to_pandas()


defs = Definitions(
    assets=[small_petals],
    resources={
        "iceberg": IcebergTableResource(
            name="test",
            config=IcebergCatalogConfig(
                properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
            ),
            namespace="dagster",
            table="ingested_data",
        )
    },
)

In this example, we attach the PyIceberg resource to the small_petals asset. In the body of the asset function, we use the load() method to retrieve the PyIceberg Table object, which can then be used for further processing.

For more information on the PyIceberg resource, see the PyIceberg resource API docs.


Configuring table behavior using table properties

Iceberg tables support table properties to configure table behavior. You can see a full list of properties here.

Use asset metadata to set table properties:

docs/snippets/table_properties.py
import pandas as pd
from dagster import Definitions, asset

from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.io_manager.pandas import IcebergPandasIOManager

CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
    "file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)


resources = {
    "io_manager": IcebergPandasIOManager(
        name="test",
        config=IcebergCatalogConfig(
            properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
        ),
        namespace="dagster",
    )
}


@asset
def iris_dataset() -> pd.DataFrame:
    return pd.read_csv(
        "https://docs.dagster.io/assets/iris.csv",
        names=[
            "sepal_length_cm",
            "sepal_width_cm",
            "petal_length_cm",
            "petal_width_cm",
            "species",
        ],
    )


@asset(
    metadata={
        "table_properties": {
            "write.parquet.page-size-bytes": "2048",  # 2MB
            "write.parquet.page-row-limit": "10000",
        }
    }
)
def sepal_data(iris_sepal: pd.DataFrame) -> pd.DataFrame:
    iris_sepal["sepal_area_cm2"] = (
        iris_sepal["sepal_length_cm"] * iris_sepal["sepal_width_cm"]
    )
    return iris_sepal


defs = Definitions(assets=[iris_dataset, sepal_data], resources=resources)

Allowing updates to schema and partitions

By default, assets will error when you change the partition spec (e.g. if you change a partition from hourly to daily) or the schema (e.g. when you add a column). You can allow updates to an asset's partition spec and/or schema by adding the following configuration options to the asset metadata:

@asset(
    partitions_def=MultiPartitionsDefinition(
        {
            "date": DailyPartitionsDefinition(start_date="2023-01-01"),
            "species": StaticPartitionDefinition(
                ["Iris-setosa", "Iris-virginica", "Iris-versicolor"]
            ),
        }
    ),
    metadata={
        "partition_expr": {"date": "time", "species": "species"},
        "partition_spec_update_mode": "update",
        "schema_update_mode": "update"
    },
)
def iris_dataset_partitioned(context) -> pd.DataFrame:
    ...

Using the custom DB IO Manager

The dagster-pyiceberg library leans heavily on Dagster's DbIOManager implementation. This IO manager comes with some limitations, however, such as the lack of support for various partition mappings. A custom (experimental) DbIOManager implementation is available that supports partition mappings as long as any time-based partition is consecutive and static partitions are of string type. You can enable it as follows:

from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.io_manager.arrow import IcebergPyarrowIOManager


IcebergPyarrowIOManager(
    name="my_catalog",
    config=IcebergCatalogConfig(properties={...}),
    namespace="my_schema",
    db_io_manager="custom"
)

For example, a MultiToSingleDimensionPartitionMapping is supported:

@asset(
    key_prefix=["my_schema"],
    partitions_def=daily_partitions_def,
    ins={
        "multi_partitioned_asset": AssetIn(
            ["my_schema", "multi_partitioned_asset_1"],
            partition_mapping=MultiToSingleDimensionPartitionMapping(
                partition_dimension_name="date"
            ),
        )
    },
    metadata={
        "partition_expr": "date_column",
    },
)
def single_partitioned_asset_date(multi_partitioned_asset: pa.Table) -> pa.Table:
    ...

But a SpecificPartitionsPartitionMapping is not because these dates are not consecutive:

@asset(
    partitions_def=multi_partition_with_letter,
    key_prefix=["my_schema"],
    metadata={"partition_expr": {"time": "time", "letter": "letter"}},
    ins={
        "multi_partitioned_asset": AssetIn(
            ["my_schema", "multi_partitioned_asset_1"],
            partition_mapping=MultiPartitionMapping(
                {
                    "color": DimensionPartitionMapping(
                        dimension_name="letter",
                        partition_mapping=StaticPartitionMapping(
                            {"blue": "a", "red": "b", "yellow": "c"}
                        ),
                    ),
                    "date": DimensionPartitionMapping(
                        dimension_name="date",
                        partition_mapping=SpecificPartitionsPartitionMapping(
                            ["2022-01-01", "2024-01-01"]
                        ),
                    ),
                }
            ),
        )
    },
)
def mapped_multi_partition(
    context: AssetExecutionContext, multi_partitioned_asset: pa.Table
) -> pa.Table:
    ...