Skip to content

Polars

Partitioning

All Polars IoManagers support Hive-style partitioning via the partition_by parameter on the @task decorator. When partition_by includes "backfill_key", the column is auto-injected before writing and auto-filtered on read.

io = PolarsParquetIoManager(
    base_path="abfss://lake@acct.dfs.core.windows.net/data",
)

@task(io_manager=io, partition_by=["backfill_key", "region"])
def extract(): ...

Parquet, CSV, and NDJSON use pl.PartitionBy for LazyFrame sinks. DataFrame writes use native partition_by (Parquet) or .lazy().sink_* with PartitionBy (CSV/NDJSON). Delta uses delta_write_options={"partition_by": ...}.

Reads use hive_partitioning=True (Parquet, CSV, NDJSON) or Delta's native partition pruning.

Parquet

databricks_bundle_decorators.io_managers.PolarsParquetIoManager(base_path, storage_options=None, write_options=None, read_options=None, *, auto_filter=True)

Bases: IoManager

Persist Polars DataFrames as Parquet on any cloud or local filesystem.

Automatically dispatches based on return-value type:

  • polars.DataFramewrite_parquet / read_parquet
  • polars.LazyFramesink_parquet / scan_parquet

On the read side, the downstream task's parameter type annotation determines the method used. Annotate the parameter as pl.DataFrame to receive an eager read; otherwise (including unannotated parameters) a lazy scan_parquet is used by default.

Parameters:

Name Type Description Default
base_path str | Callable[[], str]

Root URI for Parquet files. Can be a local path (/tmp/data), an Azure URI (abfss://container@account.dfs.core.windows.net/path), an S3 URI (s3://bucket/prefix), a GCS URI (gs://bucket/prefix), or any other URI scheme that Polars supports.

Can also be a callable that returns a string, resolved lazily at runtime. Use this for multi-environment deployments where the path depends on job parameters::

from databricks_bundle_decorators import params

io = PolarsParquetIoManager(
    base_path=lambda: f"abfss://lake@{params['env']}account.dfs.core.windows.net/data",
)
required
storage_options dict[str, str] | Callable[[], dict[str, str]] | None

Credentials / options forwarded to Polars I/O calls. Can be a plain dict, a callable that returns a dict (resolved lazily on each read/write), or None.

Use a callable to defer credential lookup to runtime — this is essential when credentials come from get_dbutils which is only available on a Databricks cluster, not during local bundle deploy::

from databricks_bundle_decorators import get_dbutils

def _storage_options() -> dict[str, str]:
    dbutils = get_dbutils()
    key = dbutils.secrets.get(scope="kv", key="storage-key")
    return {"account_name": "myaccount", "account_key": key}

io = PolarsParquetIoManager(
    base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
    storage_options=_storage_options,
)

A plain dict also works when credentials are known statically::

{"account_name": "...", "account_key": "..."}   # Azure
{"aws_access_key_id": "...", "aws_secret_access_key": "..."}  # S3
None
write_options dict[str, Any] | None

Extra keyword arguments forwarded to the Polars write call (write_parquet / sink_parquet). For example::

{"compression": "zstd", "row_group_size": 100_000}

Do not include storage_options here — use the dedicated parameter instead.

None
read_options dict[str, Any] | None

Extra keyword arguments forwarded to the Polars read call (read_parquet / scan_parquet).

None
Example

::

from databricks_bundle_decorators.io_managers import PolarsParquetIoManager

io = PolarsParquetIoManager(
    base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
    storage_options={"account_name": "myaccount", "account_key": "***"},
)

@task(io_manager=io)
def extract() -> pl.LazyFrame:    # sink_parquet on write
    return pl.LazyFrame({"a": [1, 2]})

@task
def transform(df: pl.LazyFrame):  # scan_parquet on read
    print(df.collect())
Source code in src/databricks_bundle_decorators/io_managers/polars_parquet.py
def __init__(
    self,
    base_path: str | Callable[[], str],
    storage_options: dict[str, str] | Callable[[], dict[str, str]] | None = None,
    write_options: dict[str, Any] | None = None,
    read_options: dict[str, Any] | None = None,
    *,
    auto_filter: bool = True,
) -> None:
    self._base_path = base_path
    self._storage_options = storage_options
    self._write_options = write_options or {}
    self._read_options = read_options or {}
    self.auto_filter = auto_filter

base_path property

Resolve base_path, calling it first if it is a callable.

storage_options property

Resolve storage_options, calling it first if it is a callable.

write(context, obj)

Write a Polars DataFrame or LazyFrame to Parquet.

  • polars.DataFramewrite_parquet (native partition_by)
  • polars.LazyFramesink_parquet (pl.PartitionBy)

When partition_by is set on the @task decorator, writes to Hive-style partitioned directories.

Source code in src/databricks_bundle_decorators/io_managers/polars_parquet.py
def write(self, context: OutputContext, obj: Any) -> None:
    """Write a Polars DataFrame or LazyFrame to Parquet.

    - `polars.DataFrame` → ``write_parquet`` (native ``partition_by``)
    - `polars.LazyFrame` → ``sink_parquet`` (``pl.PartitionBy``)

    When ``partition_by`` is set on the ``@task`` decorator, writes
    to Hive-style partitioned directories.
    """
    import polars as pl

    base_uri = self._uri(context.task_key)
    partition_by = context.partition_by

    # Inject backfill_key column if it's a partition column
    if _needs_backfill_key_col(partition_by):
        bk = _resolve_backfill_key(context.backfill_key)
        obj = obj.with_columns(pl.lit(bk).alias("backfill_key"))

    if partition_by:
        # Extract partition values from data before writing
        self._last_partition_values = _polars_extract_partition_values(
            obj, partition_by
        )
        if isinstance(obj, pl.LazyFrame):
            obj.sink_parquet(
                pl.PartitionBy(base_uri, key=partition_by),
                mkdir=True,
                storage_options=self.storage_options,
                **self._write_options,
            )
        elif isinstance(obj, pl.DataFrame):
            obj.write_parquet(
                base_uri,
                partition_by=partition_by,
                mkdir=True,
                storage_options=self.storage_options,
                **self._write_options,
            )
        else:
            msg = (
                f"PolarsParquetIoManager.write() expects a polars.DataFrame or "
                f"polars.LazyFrame, got {type(obj).__name__}"
            )
            raise TypeError(msg)
    else:
        uri = f"{base_uri}.parquet"
        if isinstance(obj, pl.LazyFrame):
            obj.sink_parquet(
                uri, storage_options=self.storage_options, **self._write_options
            )
        elif isinstance(obj, pl.DataFrame):
            obj.write_parquet(
                uri, storage_options=self.storage_options, **self._write_options
            )
        else:
            msg = (
                f"PolarsParquetIoManager.write() expects a polars.DataFrame or "
                f"polars.LazyFrame, got {type(obj).__name__}"
            )
            raise TypeError(msg)

read(context)

Read Parquet as a LazyFrame or DataFrame.

If the downstream parameter is annotated as polars.DataFrame, returns read_parquet (eager). Otherwise returns scan_parquet (lazy polars.LazyFrame) — this is the default for unannotated parameters.

When partition_by is set on the producing @task, reads from the Hive-partitioned directory. By default only the current backfill_key partition is returned; use all_partitions() on the upstream dependency or @task(all_partitions=True) on the consuming task to read all partitions.

Source code in src/databricks_bundle_decorators/io_managers/polars_parquet.py
def read(self, context: InputContext) -> Any:
    """Read Parquet as a LazyFrame or DataFrame.

    If the downstream parameter is annotated as `polars.DataFrame`,
    returns ``read_parquet`` (eager).  Otherwise returns ``scan_parquet``
    (lazy `polars.LazyFrame`) — this is the default for
    unannotated parameters.

    When ``partition_by`` is set on the producing ``@task``, reads
    from the Hive-partitioned directory.  By default only the
    current ``backfill_key`` partition is returned; use
    `all_partitions()` on the upstream dependency or
    ``@task(all_partitions=True)`` on the consuming task to read
    all partitions.
    """
    import polars as pl

    base_uri = self._uri(context.upstream_task_key)
    partition_by = context.partition_by

    if partition_by:
        glob_uri = f"{base_uri}/**/*.parquet"
        if context.expected_type is pl.DataFrame:
            result = pl.read_parquet(
                glob_uri,
                hive_partitioning=True,
                storage_options=self.storage_options,
                **self._read_options,
            )
        else:
            result = pl.scan_parquet(
                glob_uri,
                hive_partitioning=True,
                storage_options=self.storage_options,
                **self._read_options,
            )
        if context.partition_filter and not context.all_partitions:
            result = _polars_apply_partition_filter(
                result, context.partition_filter
            )
        elif (
            self.auto_filter
            and _needs_backfill_key_col(partition_by)
            and not context.all_partitions
        ):
            result = result.filter(
                pl.col("backfill_key")
                == _resolve_backfill_key(context.backfill_key)
            )
        return result

    uri = f"{base_uri}.parquet"
    if context.expected_type is pl.DataFrame:
        return pl.read_parquet(
            uri, storage_options=self.storage_options, **self._read_options
        )
    return pl.scan_parquet(
        uri, storage_options=self.storage_options, **self._read_options
    )

Delta

databricks_bundle_decorators.io_managers.PolarsDeltaIoManager(base_path, storage_options=None, write_options=None, read_options=None, mode='error', *, auto_filter=True)

Bases: IoManager

Persist Polars DataFrames as Delta tables on any cloud or local filesystem.

Write dispatch:

  • polars.DataFramewrite_delta
  • polars.LazyFramesink_delta
  • deltalake.table.TableMerger.execute() (for merge operations with predicate / action chaining)

On the read side, the downstream task's parameter type annotation determines the method used. Annotate the parameter as pl.DataFrame to receive an eager read_delta; otherwise (including unannotated parameters) a lazy scan_delta is used by default.

Parameters:

Name Type Description Default
base_path str | Callable[[], str]

Root URI for Delta tables. Each task creates a sub-directory named after its task key. Can be a local path, an Azure URI (abfss://…), an S3 URI (s3://…), a GCS URI (gs://…), or any other URI scheme supported by deltalake.

Can also be a callable that returns a string, resolved lazily at runtime. Use this for multi-environment deployments where the path depends on job parameters::

from databricks_bundle_decorators import params

io = PolarsDeltaIoManager(
    base_path=lambda: f"abfss://lake@{params['env']}account.dfs.core.windows.net/data",
)
required
storage_options dict[str, str] | Callable[[], dict[str, str]] | None

Credentials / options forwarded to Polars and deltalake I/O calls. Can be a plain dict, a callable that returns a dict (resolved lazily on each read/write), or None.

.. note::

deltalake uses its own key naming convention for storage options (e.g. AZURE_STORAGE_ACCOUNT_NAME instead of account_name). Consult the deltalake documentation <https://delta-io.github.io/delta-rs/>_ for the correct keys.

Use a callable to defer credential lookup to runtime::

from databricks_bundle_decorators import get_dbutils

def _storage_options() -> dict[str, str]:
    dbutils = get_dbutils()
    key = dbutils.secrets.get(scope="kv", key="storage-key")
    return {"AZURE_STORAGE_ACCOUNT_NAME": "myaccount",
            "AZURE_STORAGE_ACCOUNT_KEY": key}

io = PolarsDeltaIoManager(
    base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
    storage_options=_storage_options,
)
None
write_options dict[str, Any] | None

Extra keyword arguments forwarded to the Polars write call (write_delta / sink_delta). For example::

{"delta_write_options": {"partition_by": ["region"]}}

Do not include storage_options or mode here — they are managed by the IoManager.

None
mode str

Delta write mode. One of "overwrite", "append", "error", or "ignore". Defaults to "error".

For merge operations, ignore this parameter and return a fully-configured deltalake.table.TableMerger from your task instead. The IoManager will call .execute() on it automatically.

'error'
read_options dict[str, Any] | None

Extra keyword arguments forwarded to the Polars read call (read_delta / scan_delta).

None
Example

::

from databricks_bundle_decorators.io_managers import PolarsDeltaIoManager

io = PolarsDeltaIoManager(
    base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
)

@task(io_manager=io)
def extract() -> pl.DataFrame:
    return pl.DataFrame({"a": [1, 2]})

@task
def transform(df: pl.LazyFrame):  # scan_delta on read
    print(df.collect())
Source code in src/databricks_bundle_decorators/io_managers/polars_delta.py
def __init__(
    self,
    base_path: str | Callable[[], str],
    storage_options: dict[str, str] | Callable[[], dict[str, str]] | None = None,
    write_options: dict[str, Any] | None = None,
    read_options: dict[str, Any] | None = None,
    mode: str = "error",
    *,
    auto_filter: bool = True,
) -> None:
    self._base_path = base_path
    self._storage_options = storage_options
    self._write_options = write_options or {}
    self._read_options = read_options or {}
    self._mode = mode
    self.auto_filter = auto_filter

base_path property

Resolve base_path, calling it first if it is a callable.

storage_options property

Resolve storage_options, calling it first if it is a callable.

write(context, obj)

Write a Polars DataFrame, LazyFrame, or TableMerger.

  • polars.DataFramewrite_delta
  • polars.LazyFramesink_delta
  • deltalake.table.TableMerger.execute()

When partition_by is set on the @task decorator, writes with delta_write_options={"partition_by": ...}.

Source code in src/databricks_bundle_decorators/io_managers/polars_delta.py
def write(self, context: OutputContext, obj: Any) -> None:
    """Write a Polars DataFrame, LazyFrame, or TableMerger.

    - `polars.DataFrame` → ``write_delta``
    - `polars.LazyFrame` → ``sink_delta``
    - `deltalake.table.TableMerger` → ``.execute()``

    When ``partition_by`` is set on the ``@task`` decorator, writes
    with ``delta_write_options={"partition_by": ...}``.
    """
    # Handle merge builders first (no import guard needed — duck-type
    # check avoids requiring deltalake at import time).
    _merger_cls: type | None = None
    try:
        from deltalake.table import TableMerger

        _merger_cls = TableMerger
    except ImportError:
        pass

    if _merger_cls is not None and isinstance(obj, _merger_cls):
        obj.execute()
        return

    import polars as pl

    uri = self._uri(context.task_key)
    partition_by = context.partition_by

    # Inject backfill_key column if it's a partition column
    if _needs_backfill_key_col(partition_by):
        bk = _resolve_backfill_key(context.backfill_key)
        obj = obj.with_columns(pl.lit(bk).alias("backfill_key"))

    # Merge partition_by into write_options for Delta
    write_opts = dict(self._write_options)
    if partition_by:
        delta_opts = write_opts.setdefault("delta_write_options", {})
        delta_opts.setdefault("partition_by", partition_by)

    # Extract partition values from data before writing
    if partition_by:
        self._last_partition_values = _polars_extract_partition_values(
            obj, partition_by
        )

    # Scope overwrite to affected partitions only
    if self._mode == "overwrite" and partition_by and self._last_partition_values:
        delta_opts = write_opts.setdefault("delta_write_options", {})
        delta_opts.setdefault(
            "predicate", _build_replace_where(self._last_partition_values)
        )

    if isinstance(obj, pl.LazyFrame):
        obj.sink_delta(
            uri,
            mode=self._mode,
            storage_options=self.storage_options,
            **write_opts,
        )
    elif isinstance(obj, pl.DataFrame):
        obj.write_delta(
            uri,
            mode=self._mode,
            storage_options=self.storage_options,
            **write_opts,
        )
    else:
        msg = (
            f"PolarsDeltaIoManager.write() expects a polars.DataFrame, "
            f"polars.LazyFrame, or deltalake TableMerger, "
            f"got {type(obj).__name__}"
        )
        raise TypeError(msg)

read(context)

Read a Delta table as a LazyFrame or DataFrame.

If the downstream parameter is annotated as polars.DataFrame, returns read_delta (eager). Otherwise returns scan_delta (lazy polars.LazyFrame) — this is the default for unannotated parameters.

When partition_by includes "backfill_key", reads are filtered to the current partition unless the upstream dependency uses all_partitions() or the consuming task uses @task(all_partitions=True).

Source code in src/databricks_bundle_decorators/io_managers/polars_delta.py
def read(self, context: InputContext) -> Any:
    """Read a Delta table as a LazyFrame or DataFrame.

    If the downstream parameter is annotated as `polars.DataFrame`,
    returns ``read_delta`` (eager).  Otherwise returns ``scan_delta``
    (lazy `polars.LazyFrame`) — this is the default for
    unannotated parameters.

    When ``partition_by`` includes ``"backfill_key"``, reads are
    filtered to the current partition unless the upstream
    dependency uses `all_partitions()` or the consuming
    task uses ``@task(all_partitions=True)``.
    """
    import polars as pl

    uri = self._uri(context.upstream_task_key)

    if context.expected_type is pl.DataFrame:
        result = pl.read_delta(
            uri, storage_options=self.storage_options, **self._read_options
        )
    else:
        result = pl.scan_delta(
            uri, storage_options=self.storage_options, **self._read_options
        )

    if context.partition_filter and not context.all_partitions:
        result = _polars_apply_partition_filter(result, context.partition_filter)
    elif (
        self.auto_filter
        and _needs_backfill_key_col(context.partition_by)
        and not context.all_partitions
    ):
        result = result.filter(
            pl.col("backfill_key") == _resolve_backfill_key(context.backfill_key)
        )

    return result

JSON (NDJSON)

databricks_bundle_decorators.io_managers.PolarsJsonIoManager(base_path, storage_options=None, write_options=None, read_options=None, *, auto_filter=True)

Bases: IoManager

Persist Polars DataFrames as NDJSON on any cloud or local filesystem.

Uses newline-delimited JSON (NDJSON) format, the standard for streaming data pipelines. This is the only JSON variant in Polars that supports cloud storage (storage_options) and lazy I/O.

Write dispatch:

  • polars.LazyFramesink_ndjson
  • polars.DataFrame.lazy() then sink_ndjson (routed through the lazy path for cloud storage support)

On the read side, the downstream task's parameter type annotation determines the method used. Annotate the parameter as pl.DataFrame to receive an eager read_ndjson; otherwise (including unannotated parameters) a lazy scan_ndjson is used by default.

Parameters:

Name Type Description Default
base_path str | Callable[[], str]

Root URI for NDJSON files. Can be a local path (/tmp/data), an Azure URI (abfss://container@account.dfs.core.windows.net/path), an S3 URI (s3://bucket/prefix), a GCS URI (gs://bucket/prefix), or any other URI scheme that Polars supports.

Can also be a callable that returns a string, resolved lazily at runtime. Use this for multi-environment deployments where the path depends on job parameters::

from databricks_bundle_decorators import params

io = PolarsJsonIoManager(
    base_path=lambda: f"abfss://lake@{params['env']}account.dfs.core.windows.net/data",
)
required
storage_options dict[str, str] | Callable[[], dict[str, str]] | None

Credentials / options forwarded to Polars I/O calls. Can be a plain dict, a callable that returns a dict (resolved lazily on each read/write), or None.

Use a callable to defer credential lookup to runtime::

from databricks_bundle_decorators import get_dbutils

def _storage_options() -> dict[str, str]:
    dbutils = get_dbutils()
    key = dbutils.secrets.get(scope="kv", key="storage-key")
    return {"account_name": "myaccount", "account_key": key}

io = PolarsJsonIoManager(
    base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
    storage_options=_storage_options,
)
None
write_options dict[str, Any] | None

Extra keyword arguments forwarded to the Polars write call (sink_ndjson).

Do not include storage_options here — use the dedicated parameter instead.

None
read_options dict[str, Any] | None

Extra keyword arguments forwarded to the Polars read call (read_ndjson / scan_ndjson).

None
Example

::

from databricks_bundle_decorators.io_managers import PolarsJsonIoManager

io = PolarsJsonIoManager(
    base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
)

@task(io_manager=io)
def extract() -> pl.LazyFrame:    # sink_ndjson on write
    return pl.LazyFrame({"a": [1, 2]})

@task
def transform(df: pl.LazyFrame):  # scan_ndjson on read
    print(df.collect())
Source code in src/databricks_bundle_decorators/io_managers/polars_json.py
def __init__(
    self,
    base_path: str | Callable[[], str],
    storage_options: dict[str, str] | Callable[[], dict[str, str]] | None = None,
    write_options: dict[str, Any] | None = None,
    read_options: dict[str, Any] | None = None,
    *,
    auto_filter: bool = True,
) -> None:
    self._base_path = base_path
    self._storage_options = storage_options
    self._write_options = write_options or {}
    self._read_options = read_options or {}
    self.auto_filter = auto_filter

base_path property

Resolve base_path, calling it first if it is a callable.

storage_options property

Resolve storage_options, calling it first if it is a callable.

write(context, obj)

Write a Polars DataFrame or LazyFrame as NDJSON.

  • polars.LazyFramesink_ndjson
  • polars.DataFrame.lazy() then sink_ndjson

When partition_by is set on the @task decorator, writes to Hive-style partitioned directories using pl.PartitionBy.

Source code in src/databricks_bundle_decorators/io_managers/polars_json.py
def write(self, context: OutputContext, obj: Any) -> None:
    """Write a Polars DataFrame or LazyFrame as NDJSON.

    - `polars.LazyFrame` → ``sink_ndjson``
    - `polars.DataFrame` → ``.lazy()`` then ``sink_ndjson``

    When ``partition_by`` is set on the ``@task`` decorator, writes
    to Hive-style partitioned directories using
    ``pl.PartitionBy``.
    """
    import polars as pl

    base_uri = self._uri(context.task_key)
    partition_by = context.partition_by

    # Inject backfill_key column if it's a partition column
    if _needs_backfill_key_col(partition_by):
        bk = _resolve_backfill_key(context.backfill_key)
        obj = obj.with_columns(pl.lit(bk).alias("backfill_key"))

    if partition_by:
        # Extract partition values from data before writing
        self._last_partition_values = _polars_extract_partition_values(
            obj, partition_by
        )
        if isinstance(obj, pl.LazyFrame):
            obj.sink_ndjson(
                pl.PartitionBy(base_uri, key=partition_by),
                mkdir=True,
                storage_options=self.storage_options,
                **self._write_options,
            )
        elif isinstance(obj, pl.DataFrame):
            obj.lazy().sink_ndjson(
                pl.PartitionBy(base_uri, key=partition_by),
                mkdir=True,
                storage_options=self.storage_options,
                **self._write_options,
            )
        else:
            msg = (
                f"PolarsJsonIoManager.write() expects a polars.DataFrame or "
                f"polars.LazyFrame, got {type(obj).__name__}"
            )
            raise TypeError(msg)
    else:
        uri = f"{base_uri}.ndjson"
        if isinstance(obj, pl.LazyFrame):
            obj.sink_ndjson(
                uri, storage_options=self.storage_options, **self._write_options
            )
        elif isinstance(obj, pl.DataFrame):
            obj.lazy().sink_ndjson(
                uri, storage_options=self.storage_options, **self._write_options
            )
        else:
            msg = (
                f"PolarsJsonIoManager.write() expects a polars.DataFrame or "
                f"polars.LazyFrame, got {type(obj).__name__}"
            )
            raise TypeError(msg)

read(context)

Read NDJSON as a LazyFrame or DataFrame.

If the downstream parameter is annotated as polars.DataFrame, returns read_ndjson (eager). Otherwise returns scan_ndjson (lazy polars.LazyFrame) — this is the default for unannotated parameters.

When partition_by is set, reads from the Hive-partitioned directory. By default only the current backfill_key partition is returned; use all_partitions() on the upstream dependency or @task(all_partitions=True) on the consuming task to read all partitions.

Source code in src/databricks_bundle_decorators/io_managers/polars_json.py
def read(self, context: InputContext) -> Any:
    """Read NDJSON as a LazyFrame or DataFrame.

    If the downstream parameter is annotated as `polars.DataFrame`,
    returns ``read_ndjson`` (eager).  Otherwise returns ``scan_ndjson``
    (lazy `polars.LazyFrame`) — this is the default for
    unannotated parameters.

    When ``partition_by`` is set, reads from the Hive-partitioned
    directory.  By default only the current ``backfill_key``
    partition is returned; use `all_partitions()` on the
    upstream dependency or ``@task(all_partitions=True)`` on
    the consuming task to read all partitions.
    """
    import polars as pl

    base_uri = self._uri(context.upstream_task_key)
    partition_by = context.partition_by

    if partition_by:
        glob_uri = f"{base_uri}/**/*.jsonl"
        if context.expected_type is pl.DataFrame:
            result = pl.read_ndjson(
                glob_uri,
                storage_options=self.storage_options,
                **self._read_options,
            )
        else:
            result = pl.scan_ndjson(
                glob_uri,
                storage_options=self.storage_options,
                **self._read_options,
            )
        if context.partition_filter and not context.all_partitions:
            result = _polars_apply_partition_filter(
                result, context.partition_filter
            )
        elif (
            self.auto_filter
            and _needs_backfill_key_col(partition_by)
            and not context.all_partitions
        ):
            result = result.filter(
                pl.col("backfill_key")
                == _resolve_backfill_key(context.backfill_key)
            )
        return result

    uri = f"{base_uri}.ndjson"
    if context.expected_type is pl.DataFrame:
        return pl.read_ndjson(
            uri, storage_options=self.storage_options, **self._read_options
        )
    return pl.scan_ndjson(
        uri, storage_options=self.storage_options, **self._read_options
    )

CSV

databricks_bundle_decorators.io_managers.PolarsCsvIoManager(base_path, storage_options=None, write_options=None, read_options=None, *, auto_filter=True)

Bases: IoManager

Persist Polars DataFrames as CSV on any cloud or local filesystem.

Write dispatch:

  • polars.LazyFramesink_csv
  • polars.DataFramewrite_csv

On the read side, the downstream task's parameter type annotation determines the method used. Annotate the parameter as pl.DataFrame to receive an eager read_csv; otherwise (including unannotated parameters) a lazy scan_csv is used by default.

Parameters:

Name Type Description Default
base_path str | Callable[[], str]

Root URI for CSV files. Can be a local path (/tmp/data), an Azure URI (abfss://container@account.dfs.core.windows.net/path), an S3 URI (s3://bucket/prefix), a GCS URI (gs://bucket/prefix), or any other URI scheme that Polars supports.

Can also be a callable that returns a string, resolved lazily at runtime. Use this for multi-environment deployments where the path depends on job parameters::

from databricks_bundle_decorators import params

io = PolarsCsvIoManager(
    base_path=lambda: f"abfss://lake@{params['env']}account.dfs.core.windows.net/data",
)
required
storage_options dict[str, str] | Callable[[], dict[str, str]] | None

Credentials / options forwarded to Polars I/O calls. Can be a plain dict, a callable that returns a dict (resolved lazily on each read/write), or None.

Use a callable to defer credential lookup to runtime::

from databricks_bundle_decorators import get_dbutils

def _storage_options() -> dict[str, str]:
    dbutils = get_dbutils()
    key = dbutils.secrets.get(scope="kv", key="storage-key")
    return {"account_name": "myaccount", "account_key": key}

io = PolarsCsvIoManager(
    base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
    storage_options=_storage_options,
)
None
write_options dict[str, Any] | None

Extra keyword arguments forwarded to the Polars write call (write_csv / sink_csv). For example::

{"separator": ";", "quote_char": '"'}

Do not include storage_options here — use the dedicated parameter instead.

None
read_options dict[str, Any] | None

Extra keyword arguments forwarded to the Polars read call (read_csv / scan_csv).

None
Example

::

from databricks_bundle_decorators.io_managers import PolarsCsvIoManager

io = PolarsCsvIoManager(
    base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
)

@task(io_manager=io)
def extract() -> pl.LazyFrame:    # sink_csv on write
    return pl.LazyFrame({"a": [1, 2]})

@task
def transform(df: pl.LazyFrame):  # scan_csv on read
    print(df.collect())
Source code in src/databricks_bundle_decorators/io_managers/polars_csv.py
def __init__(
    self,
    base_path: str | Callable[[], str],
    storage_options: dict[str, str] | Callable[[], dict[str, str]] | None = None,
    write_options: dict[str, Any] | None = None,
    read_options: dict[str, Any] | None = None,
    *,
    auto_filter: bool = True,
) -> None:
    self._base_path = base_path
    self._storage_options = storage_options
    self._write_options = write_options or {}
    self._read_options = read_options or {}
    self.auto_filter = auto_filter

base_path property

Resolve base_path, calling it first if it is a callable.

storage_options property

Resolve storage_options, calling it first if it is a callable.

write(context, obj)

Write a Polars DataFrame or LazyFrame as CSV.

  • polars.LazyFramesink_csv
  • polars.DataFramewrite_csv (single file) or .lazy().sink_csv(PartitionBy) (partitioned)

When partition_by is set on the @task decorator, writes to Hive-style partitioned directories using pl.PartitionBy.

Source code in src/databricks_bundle_decorators/io_managers/polars_csv.py
def write(self, context: OutputContext, obj: Any) -> None:
    """Write a Polars DataFrame or LazyFrame as CSV.

    - `polars.LazyFrame` → ``sink_csv``
    - `polars.DataFrame` → ``write_csv`` (single file) or
      ``.lazy().sink_csv(PartitionBy)`` (partitioned)

    When ``partition_by`` is set on the ``@task`` decorator, writes
    to Hive-style partitioned directories using
    ``pl.PartitionBy``.
    """
    import polars as pl

    base_uri = self._uri(context.task_key)
    partition_by = context.partition_by

    # Inject backfill_key column if it's a partition column
    if _needs_backfill_key_col(partition_by):
        bk = _resolve_backfill_key(context.backfill_key)
        obj = obj.with_columns(pl.lit(bk).alias("backfill_key"))

    if partition_by:
        # Extract partition values from data before writing
        self._last_partition_values = _polars_extract_partition_values(
            obj, partition_by
        )
        if isinstance(obj, pl.LazyFrame):
            obj.sink_csv(
                pl.PartitionBy(base_uri, key=partition_by),
                mkdir=True,
                storage_options=self.storage_options,
                **self._write_options,
            )
        elif isinstance(obj, pl.DataFrame):
            obj.lazy().sink_csv(
                pl.PartitionBy(base_uri, key=partition_by),
                mkdir=True,
                storage_options=self.storage_options,
                **self._write_options,
            )
        else:
            msg = (
                f"PolarsCsvIoManager.write() expects a polars.DataFrame or "
                f"polars.LazyFrame, got {type(obj).__name__}"
            )
            raise TypeError(msg)
    else:
        uri = f"{base_uri}.csv"
        if isinstance(obj, pl.LazyFrame):
            obj.sink_csv(
                uri, storage_options=self.storage_options, **self._write_options
            )
        elif isinstance(obj, pl.DataFrame):
            obj.write_csv(
                uri, storage_options=self.storage_options, **self._write_options
            )
        else:
            msg = (
                f"PolarsCsvIoManager.write() expects a polars.DataFrame or "
                f"polars.LazyFrame, got {type(obj).__name__}"
            )
            raise TypeError(msg)

read(context)

Read CSV as a LazyFrame or DataFrame.

If the downstream parameter is annotated as polars.DataFrame, returns read_csv (eager). Otherwise returns scan_csv (lazy polars.LazyFrame) — this is the default for unannotated parameters.

When partition_by is set, reads from the Hive-partitioned directory. By default only the current backfill_key partition is returned; use all_partitions() on the upstream dependency or @task(all_partitions=True) on the consuming task to read all partitions.

Source code in src/databricks_bundle_decorators/io_managers/polars_csv.py
def read(self, context: InputContext) -> Any:
    """Read CSV as a LazyFrame or DataFrame.

    If the downstream parameter is annotated as `polars.DataFrame`,
    returns ``read_csv`` (eager).  Otherwise returns ``scan_csv``
    (lazy `polars.LazyFrame`) — this is the default for
    unannotated parameters.

    When ``partition_by`` is set, reads from the Hive-partitioned
    directory.  By default only the current ``backfill_key``
    partition is returned; use `all_partitions()` on the
    upstream dependency or ``@task(all_partitions=True)`` on
    the consuming task to read all partitions.
    """
    import polars as pl

    base_uri = self._uri(context.upstream_task_key)
    partition_by = context.partition_by

    if partition_by:
        glob_uri = f"{base_uri}/**/*.csv"
        if context.expected_type is pl.DataFrame:
            result = pl.read_csv(
                glob_uri,
                storage_options=self.storage_options,
                **self._read_options,
            )
        else:
            result = pl.scan_csv(
                glob_uri,
                storage_options=self.storage_options,
                **self._read_options,
            )
        if context.partition_filter and not context.all_partitions:
            result = _polars_apply_partition_filter(
                result, context.partition_filter
            )
        elif (
            self.auto_filter
            and _needs_backfill_key_col(partition_by)
            and not context.all_partitions
        ):
            result = result.filter(
                pl.col("backfill_key")
                == _resolve_backfill_key(context.backfill_key)
            )
        return result

    uri = f"{base_uri}.csv"
    if context.expected_type is pl.DataFrame:
        return pl.read_csv(
            uri, storage_options=self.storage_options, **self._read_options
        )
    return pl.scan_csv(
        uri, storage_options=self.storage_options, **self._read_options
    )