Resources
dagster_pyiceberg.resource.IcebergTableResource
Resource for interacting with a PyIceberg table.
Examples:
from dagster import Definitions, asset
from dagster_pyiceberg import PyIcebergTableResource, LocalConfig
@asset
def my_table(pyiceberg_table: PyIcebergTableResource):
df = pyiceberg_table.load().to_pandas()
defs = Definitions(
assets=[my_table],
resources={
"pyiceberg_table,
PyIcebergTableResource(
name="mycatalog",
namespace="mynamespace",
table="mytable",
config=IcebergCatalogConfig(properties={
"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
"warehouse": f"file://{warehouse_path}",
}),
)
}
)
Configuration
dagster_pyiceberg.config.IcebergCatalogConfig
Configuration for Iceberg Catalogs. See https://py.iceberg.apache.org/configuration/#catalogs for configuration options.
You can configure the PyIceberg IO manager:
1. Using a `.pyiceberg.yaml` configuration file.
2. Through environment variables.
3. Using the `IcebergCatalogConfig` configuration object.
For more information about the first two configuration options, see https://py.iceberg.apache.org/configuration/#setting-configuration-values
Example:
from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.io_manager.arrow import IcebergPyarrowIOManager
warehouse_path = "/path/to/warehouse
io_manager = IcebergPyarrowIOManager(
name=catalog_name,
config=IcebergCatalogConfig(properties={
"uri": f"sqlite:///{warehouse_path}/pyiceberg_catalog.db",
"warehouse": f"file://{warehouse_path}",
}),
namespace=namespace,
)
IO Managers
dagster_pyiceberg.io_manager.arrow.IcebergPyarrowIOManager
An IO manager definition that reads inputs from and writes outputs to Iceberg tables using PyArrow.
Examples:
import pandas as pd
import pyarrow as pa
from dagster import Definitions, asset
from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.io_manager.arrow import IcebergPyarrowIOManager
CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
"file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)
resources = {
"io_manager": IcebergPyarrowIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
)
}
@asset
def iris_dataset() -> pa.Table:
pa.Table.from_pandas(
pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
)
defs = Definitions(assets=[iris_dataset], resources=resources)
If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O Manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a "schema" entry in output metadata. If none of these is provided, the schema will default to "public". The I/O manager will check if the namespace exists in the iceberg catalog. It does not automatically create the namespace if it does not exist.
@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
...
To only use specific columns of a table as input to a downstream op or asset, add the metadata "columns" to the In or AssetIn.
@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame):
# my_table will just contain the data from column "a"
...
dagster_pyiceberg.io_manager.polars.IcebergPolarsIOManager
An IO manager definition that reads inputs from and writes outputs to Iceberg tables using Polars.
Examples:
import pandas as pd
import polars as pl
from dagster import Definitions, asset
from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.io_manager.polars import IcebergPolarsIOManager
CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
"file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)
resources = {
"io_manager": IcebergPolarsIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
)
}
@asset
def iris_dataset() -> pl.DataFrame:
return pl.from_pandas(
pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
)
defs = Definitions(assets=[iris_dataset], resources=resources)
If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O Manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a "schema" entry in output metadata. If none of these is provided, the schema will default to "public". The I/O manager will check if the namespace exists in the iceberg catalog. It does not automatically create the namespace if it does not exist.
@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
...
To only use specific columns of a table as input to a downstream op or asset, add the metadata "columns" to the In or AssetIn.
@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame):
# my_table will just contain the data from column "a"
...
dagster_pyiceberg.io_manager.daft.IcebergDaftIOManager
An IO manager definition that reads inputs from and writes outputs to Iceberg tables using Daft.
Examples:
import daft as da
import pandas as pd
from dagster import Definitions, asset
from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.io_manager.daft import IcebergDaftIOManager
CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
"file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)
resources = {
"io_manager": IcebergDaftIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
)
}
@asset
def iris_dataset() -> da.DataFrame:
return da.from_pandas(
pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
)
defs = Definitions(assets=[iris_dataset], resources=resources)
If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O Manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a "schema" entry in output metadata. If none of these is provided, the schema will default to "public". The I/O manager will check if the namespace exists in the iceberg catalog. It does not automatically create the namespace if it does not exist.
@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
...
To only use specific columns of a table as input to a downstream op or asset, add the metadata "columns" to the In or AssetIn.
@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame):
# my_table will just contain the data from column "a"
...
dagster_pyiceberg.io_manager.pandas.IcebergPandasIOManager
An IO manager definition that reads inputs from and writes outputs to Iceberg tables using Pandas.
Examples:
import pandas as pd
from dagster import Definitions, asset
from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.io_manager.pandas import IcebergPandasIOManager
CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
"file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)
resources = {
"io_manager": IcebergPandasIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
namespace="dagster",
)
}
@asset
def iris_dataset() -> pd.DataFrame:
return pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
defs = Definitions(assets=[iris_dataset], resources=resources)
If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O Manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a "schema" entry in output metadata. If none of these is provided, the schema will default to "public". The I/O manager will check if the namespace exists in the iceberg catalog. It does not automatically create the namespace if it does not exist.
@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
...
To only use specific columns of a table as input to a downstream op or asset, add the metadata "columns" to the In or AssetIn.
@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame):
# my_table will just contain the data from column "a"
...
Base classes
dagster_pyiceberg.handler.IcebergBaseTypeHandler
Methods:
-
handle_output
–Stores pyarrow types in Iceberg table
-
load_input
–Loads the input using a dataframe implementation
handle_output
Stores pyarrow types in Iceberg table
Source code in src/dagster_pyiceberg/handler.py
load_input
Loads the input using a dataframe implementation
Source code in src/dagster_pyiceberg/handler.py
dagster_pyiceberg.io_manager.base.IcebergIOManager
Base class for an IO manager definition that reads inputs from and writes outputs to Iceberg tables.
Examples:
import pandas as pd
import pyarrow as pa
from dagster import Definitions, asset
from dagster_pyiceberg.config import IcebergCatalogConfig
from dagster_pyiceberg.io_manager.arrow import IcebergPyarrowIOManager
CATALOG_URI = "sqlite:////home/vscode/workspace/.tmp/examples/select_columns/catalog.db"
CATALOG_WAREHOUSE = (
"file:///home/vscode/workspace/.tmp/examples/select_columns/warehouse"
)
resources = {
"io_manager": IcebergPyarrowIOManager(
name="test",
config=IcebergCatalogConfig(
properties={"uri": CATALOG_URI, "warehouse": CATALOG_WAREHOUSE}
),
schema="dagster",
)
}
@asset
def iris_dataset() -> pa.Table:
pa.Table.from_pandas(
pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
)
defs = Definitions(assets=[iris_dataset], resources=resources)
If you do not provide a schema, Dagster will determine a schema based on the assets and ops using the I/O Manager. For assets, the schema will be determined from the asset key, as in the above example. For ops, the schema can be specified by including a "schema" entry in output metadata. If none of these is provided, the schema will default to "public". The I/O manager will check if the namespace exists in the iceberg catalog. It does not automatically create the namespace if it does not exist.
@op(
out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
...
To only use specific columns of a table as input to a downstream op or asset, add the metadata "columns" to the In or AssetIn.
@asset(
ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame):
# my_table will just contain the data from column "a"
...