Skip to content

Polars

Partitioning

All Polars IoManagers support Hive-style partitioning via the partition_by parameter on the @task decorator. When partition_by includes "backfill_key", the column is auto-injected before writing and auto-filtered on read.

io = PolarsParquetIoManager(
    base_path="abfss://lake@acct.dfs.core.windows.net/data",
)

@task(io_manager=io, partition_by=["backfill_key", "region"])
def extract(): ...

Parquet, CSV, and NDJSON use pl.PartitionBy for LazyFrame sinks. DataFrame writes use native partition_by (Parquet) or .lazy().sink_* with PartitionBy (CSV/NDJSON). Delta uses delta_write_options={"partition_by": ...}.

Reads use hive_partitioning=True (Parquet, CSV, NDJSON) or Delta's native partition pruning.

Parquet

databricks_bundle_decorators.io_managers.PolarsParquetIoManager(base_path, storage_options=None, write_options=None, read_options=None, *, auto_filter=True, retry=None)

Bases: IoManager

Persist Polars DataFrames as Parquet on any cloud or local filesystem.

Automatically dispatches based on return-value type:

  • polars.DataFramewrite_parquet / read_parquet
  • polars.LazyFramesink_parquet / scan_parquet

On the read side, the downstream task's parameter type annotation determines the method used. Annotate the parameter as pl.DataFrame to receive an eager read; otherwise (including unannotated parameters) a lazy scan_parquet is used by default.

Parameters:

Name Type Description Default
base_path str | Callable[[], str]

Root URI for Parquet files. Can be a local path (/tmp/data), an Azure URI (abfss://container@account.dfs.core.windows.net/path), an S3 URI (s3://bucket/prefix), a GCS URI (gs://bucket/prefix), or any other URI scheme that Polars supports.

Can also be a callable that returns a string, resolved lazily at runtime. Use this for multi-environment deployments where the path depends on job parameters::

from databricks_bundle_decorators import params

io = PolarsParquetIoManager(
    base_path=lambda: f"abfss://lake@{params['env']}account.dfs.core.windows.net/data",
)
required
storage_options dict[str, str] | Callable[[], dict[str, str]] | None

Credentials / options forwarded to Polars I/O calls. Can be a plain dict, a callable that returns a dict (resolved lazily on each read/write), or None.

Use a callable to defer credential lookup to runtime — this is essential when credentials come from get_dbutils which is only available on a Databricks cluster, not during local bundle deploy::

from databricks_bundle_decorators import get_dbutils


def _storage_options() -> dict[str, str]:
    dbutils = get_dbutils()
    key = dbutils.secrets.get(scope="kv", key="storage-key")
    return {"account_name": "myaccount", "account_key": key}


io = PolarsParquetIoManager(
    base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
    storage_options=_storage_options,
)

A plain dict also works when credentials are known statically::

{"account_name": "...", "account_key": "..."}  # Azure
{"aws_access_key_id": "...", "aws_secret_access_key": "..."}  # S3
None
write_options dict[str, Any] | None

Extra keyword arguments forwarded to the Polars write call (write_parquet / sink_parquet). For example::

{"compression": "zstd", "row_group_size": 100_000}

Do not include storage_options here — use the dedicated parameter instead.

None
read_options dict[str, Any] | None

Extra keyword arguments forwarded to the Polars read call (read_parquet / scan_parquet).

None
retry `RetryConfig` | None

Optional retry configuration for write operations. When set, failed writes are retried with exponential backoff (powered by tenacity). Useful for handling transient write conflicts during concurrent backfill runs. Defaults to None (no retries).

None
Example

::

from databricks_bundle_decorators.io_managers import PolarsParquetIoManager

io = PolarsParquetIoManager(
    base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
    storage_options={"account_name": "myaccount", "account_key": "***"},
)


@task(io_manager=io)
def extract() -> pl.LazyFrame:  # sink_parquet on write
    return pl.LazyFrame({"a": [1, 2]})


@task
def transform(df: pl.LazyFrame):  # scan_parquet on read
    print(df.collect())
Source code in src/databricks_bundle_decorators/io_managers/polars_parquet.py
def __init__(
    self,
    base_path: str | Callable[[], str],
    storage_options: dict[str, str] | Callable[[], dict[str, str]] | None = None,
    write_options: dict[str, Any] | None = None,
    read_options: dict[str, Any] | None = None,
    *,
    auto_filter: bool = True,
    retry: RetryConfig | None = None,
) -> None:
    self._base_path = base_path
    self._storage_options = storage_options
    self._write_options = write_options or {}
    self._read_options = read_options or {}
    self.auto_filter = auto_filter
    self.retry = retry

base_path property

Resolve base_path, calling it first if it is a callable.

storage_options property

Resolve storage_options, calling it first if it is a callable.

write(context, obj)

Write a Polars DataFrame or LazyFrame to Parquet.

  • polars.DataFramewrite_parquet (native partition_by)
  • polars.LazyFramesink_parquet (pl.PartitionBy)

When partition_by is set on the @task decorator, writes to Hive-style partitioned directories.

Source code in src/databricks_bundle_decorators/io_managers/polars_parquet.py
def write(self, context: OutputContext, obj: Any) -> None:
    """Write a Polars DataFrame or LazyFrame to Parquet.

    - `polars.DataFrame` → ``write_parquet`` (native ``partition_by``)
    - `polars.LazyFrame` → ``sink_parquet`` (``pl.PartitionBy``)

    When ``partition_by`` is set on the ``@task`` decorator, writes
    to Hive-style partitioned directories.
    """
    import polars as pl  # noqa: PLC0415

    base_uri = self._uri(context.asset_name)
    partition_by = context.partition_by
    _logger.info("Writing to %s (partition_by=%s)", base_uri, partition_by)

    # Inject backfill_key column if it's a partition column
    has_bk_col = isinstance(
        obj, (pl.DataFrame, pl.LazyFrame)
    ) and "backfill_key" in (
        obj.collect_schema().names()
        if isinstance(obj, pl.LazyFrame)
        else obj.columns
    )
    if _should_inject_backfill_key(partition_by, has_backfill_key_col=has_bk_col):
        bk = _resolve_backfill_key(context.backfill_key)
        obj = obj.with_columns(pl.lit(bk).alias("backfill_key"))

    if partition_by:
        # Extract partition values from data before writing
        self._last_partition_values = _polars_extract_partition_values(
            obj, partition_by
        )
        if isinstance(obj, pl.LazyFrame):
            obj.sink_parquet(
                pl.PartitionBy(base_uri, key=partition_by),
                mkdir=True,
                storage_options=self.storage_options,
                **self._write_options,
            )
        elif isinstance(obj, pl.DataFrame):
            obj.write_parquet(
                base_uri,
                partition_by=partition_by,
                mkdir=True,
                storage_options=self.storage_options,
                **self._write_options,
            )
        else:
            msg = (
                f"PolarsParquetIoManager.write() expects a polars.DataFrame or "
                f"polars.LazyFrame, got {type(obj).__name__}"
            )
            raise TypeError(msg)
    else:
        uri = f"{base_uri}.parquet"
        if isinstance(obj, pl.LazyFrame):
            obj.sink_parquet(
                uri, storage_options=self.storage_options, **self._write_options
            )
        elif isinstance(obj, pl.DataFrame):
            obj.write_parquet(
                uri, storage_options=self.storage_options, **self._write_options
            )
        else:
            msg = (
                f"PolarsParquetIoManager.write() expects a polars.DataFrame or "
                f"polars.LazyFrame, got {type(obj).__name__}"
            )
            raise TypeError(msg)

read(context)

Read Parquet as a LazyFrame or DataFrame.

If the downstream parameter is annotated as polars.DataFrame, returns read_parquet (eager). Otherwise returns scan_parquet (lazy polars.LazyFrame) — this is the default for unannotated parameters.

When partition_by is set on the producing @task, reads from the Hive-partitioned directory. By default only the current backfill_key partition is returned; use all_partitions() on the upstream dependency or @task(all_partitions=True) on the consuming task to read all partitions.

Source code in src/databricks_bundle_decorators/io_managers/polars_parquet.py
def read(self, context: InputContext) -> Any:
    """Read Parquet as a LazyFrame or DataFrame.

    If the downstream parameter is annotated as `polars.DataFrame`,
    returns ``read_parquet`` (eager).  Otherwise returns ``scan_parquet``
    (lazy `polars.LazyFrame`) — this is the default for
    unannotated parameters.

    When ``partition_by`` is set on the producing ``@task``, reads
    from the Hive-partitioned directory.  By default only the
    current ``backfill_key`` partition is returned; use
    `all_partitions()` on the upstream dependency or
    ``@task(all_partitions=True)`` on the consuming task to read
    all partitions.
    """
    import polars as pl  # noqa: PLC0415

    base_uri = self._uri(context.upstream_asset_name)
    partition_by = context.partition_by
    _logger.info(
        "Reading from %s (partition_filter=%s)", base_uri, context.partition_filter
    )

    if partition_by:
        glob_uri = f"{base_uri}/**/*.parquet"
        if context.expected_type is pl.DataFrame:
            result = pl.read_parquet(
                glob_uri,
                hive_partitioning=True,
                storage_options=self.storage_options,
                **self._read_options,
            )
        else:
            result = pl.scan_parquet(
                glob_uri,
                hive_partitioning=True,
                storage_options=self.storage_options,
                **self._read_options,
            )
        if context.partition_filter and not context.all_partitions:
            result = _polars_apply_partition_filter(
                result, context.partition_filter
            )
        elif (
            self.auto_filter
            and _needs_backfill_key_col(partition_by)
            and not context.all_partitions
        ):
            result = result.filter(
                pl.col("backfill_key")
                == _resolve_backfill_key(context.backfill_key)
            )
        return result

    uri = f"{base_uri}.parquet"
    if context.expected_type is pl.DataFrame:
        return pl.read_parquet(
            uri, storage_options=self.storage_options, **self._read_options
        )
    return pl.scan_parquet(
        uri, storage_options=self.storage_options, **self._read_options
    )

Delta

Merge / Upsert

mode="merge" is not a valid write mode and will raise a ValueError. To perform merge/upsert operations, return a DeltaMerge from your task function. See Delta Write Modes & Merge for full examples.

databricks_bundle_decorators.io_managers.PolarsDeltaIoManager(base_path, storage_options=None, write_options=None, read_options=None, mode='error', *, auto_filter=True, retry=None)

Bases: IoManager

Persist Polars DataFrames as Delta tables on any cloud or local filesystem.

Write dispatch:

  • polars.DataFramewrite_delta
  • polars.LazyFramesink_delta
  • DeltaMerge → merge/upsert operation

On the read side, the downstream task's parameter type annotation determines the method used. Annotate the parameter as pl.DataFrame to receive an eager read_delta; otherwise (including unannotated parameters) a lazy scan_delta is used by default.

Parameters:

Name Type Description Default
base_path str | Callable[[], str]

Root URI for Delta tables. Each task creates a sub-directory named after its task key. Can be a local path, an Azure URI (abfss://…), an S3 URI (s3://…), a GCS URI (gs://…), or any other URI scheme supported by deltalake.

Can also be a callable that returns a string, resolved lazily at runtime. Use this for multi-environment deployments where the path depends on job parameters::

from databricks_bundle_decorators import params

io = PolarsDeltaIoManager(
    base_path=lambda: f"abfss://lake@{params['env']}account.dfs.core.windows.net/data",
)
required
storage_options dict[str, str] | Callable[[], dict[str, str]] | None

Credentials / options forwarded to Polars and deltalake I/O calls. Can be a plain dict, a callable that returns a dict (resolved lazily on each read/write), or None.

.. note::

deltalake uses its own key naming convention for storage options (e.g. AZURE_STORAGE_ACCOUNT_NAME instead of account_name). Consult the deltalake documentation <https://delta-io.github.io/delta-rs/>_ for the correct keys.

Use a callable to defer credential lookup to runtime::

from databricks_bundle_decorators import get_dbutils


def _storage_options() -> dict[str, str]:
    dbutils = get_dbutils()
    key = dbutils.secrets.get(scope="kv", key="storage-key")
    return {
        "AZURE_STORAGE_ACCOUNT_NAME": "myaccount",
        "AZURE_STORAGE_ACCOUNT_KEY": key,
    }


io = PolarsDeltaIoManager(
    base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
    storage_options=_storage_options,
)
None
write_options dict[str, Any] | None

Extra keyword arguments forwarded to the Polars write call (write_delta / sink_delta). For example::

{"delta_write_options": {"partition_by": ["region"]}}

Do not include storage_options or mode here — they are managed by the IoManager.

None
mode str

Delta write mode. One of "overwrite", "append", "error", or "ignore". Defaults to "error".

For merge operations, return a DeltaMerge from your task instead.

'error'
read_options dict[str, Any] | None

Extra keyword arguments forwarded to the Polars read call (read_delta / scan_delta).

None
retry `RetryConfig` | None

Optional retry configuration for write operations. When set, failed writes are retried with exponential backoff (powered by tenacity). Useful for handling transient Delta commit conflicts during concurrent backfill runs on unpartitioned tables. Defaults to None (no retries).

None
Example

::

from databricks_bundle_decorators.io_managers import PolarsDeltaIoManager

io = PolarsDeltaIoManager(
    base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
)


@task(io_manager=io)
def extract() -> pl.DataFrame:
    return pl.DataFrame({"a": [1, 2]})


@task
def transform(df: pl.LazyFrame):  # scan_delta on read
    print(df.collect())
Source code in src/databricks_bundle_decorators/io_managers/polars_delta.py
def __init__(
    self,
    base_path: str | Callable[[], str],
    storage_options: dict[str, str] | Callable[[], dict[str, str]] | None = None,
    write_options: dict[str, Any] | None = None,
    read_options: dict[str, Any] | None = None,
    mode: str = "error",
    *,
    auto_filter: bool = True,
    retry: RetryConfig | None = None,
) -> None:
    _validate_delta_mode(mode, type(self).__name__)
    self._base_path = base_path
    self._storage_options = storage_options
    self._write_options = write_options or {}
    self._read_options = read_options or {}
    self._mode = mode
    self.auto_filter = auto_filter
    self.retry = retry

base_path property

Resolve base_path, calling it first if it is a callable.

storage_options property

Resolve storage_options, calling it first if it is a callable.

write(context, obj)

Write a Polars DataFrame, LazyFrame, or DeltaMerge.

  • polars.DataFramewrite_delta
  • polars.LazyFramesink_delta
  • DeltaMerge → merge/upsert operation

When partition_by is set on the @task decorator, writes with delta_write_options={"partition_by": ...}.

Source code in src/databricks_bundle_decorators/io_managers/polars_delta.py
def write(self, context: OutputContext, obj: Any) -> None:
    """Write a Polars DataFrame, LazyFrame, or DeltaMerge.

    - `polars.DataFrame` → ``write_delta``
    - `polars.LazyFrame` → ``sink_delta``
    - `DeltaMerge` → merge/upsert operation

    When ``partition_by`` is set on the ``@task`` decorator, writes
    with ``delta_write_options={"partition_by": ...}``.
    """
    # Handle DeltaMerge definitions — build a fresh merger and execute.
    if isinstance(obj, DeltaMerge):
        uri = self._uri(context.asset_name)
        _logger.info(
            "Merging into %s (predicate=%r, actions=%s)",
            uri,
            obj.predicate,
            obj._describe_actions(),
        )
        merger = obj._build_merger(uri, storage_options=self.storage_options)
        if merger is None:
            # Target table doesn't exist yet — write source data directly.
            obj._initial_write(
                uri,
                storage_options=self.storage_options,
                partition_by=context.partition_by,
                write_options=dict(self._write_options),
            )
        else:
            merger.execute()
        if context.partition_by:
            self._last_partition_values = _polars_extract_partition_values(
                obj.source, context.partition_by
            )
        else:
            self._last_partition_values = {}
        return

    import polars as pl  # noqa: PLC0415

    uri = self._uri(context.asset_name)
    partition_by = context.partition_by

    # Inject backfill_key column if it's a partition column
    has_bk_col = isinstance(
        obj, (pl.DataFrame, pl.LazyFrame)
    ) and "backfill_key" in (
        obj.collect_schema().names()
        if isinstance(obj, pl.LazyFrame)
        else obj.columns
    )
    if _should_inject_backfill_key(partition_by, has_backfill_key_col=has_bk_col):
        bk = _resolve_backfill_key(context.backfill_key)
        obj = obj.with_columns(pl.lit(bk).alias("backfill_key"))

    # Merge partition_by into write_options for Delta
    write_opts = dict(self._write_options)
    if partition_by:
        delta_opts = write_opts.setdefault("delta_write_options", {})
        delta_opts.setdefault("partition_by", partition_by)

    # Extract partition values from data before writing
    if partition_by:
        self._last_partition_values = _polars_extract_partition_values(
            obj, partition_by
        )

    # Scope overwrite to affected partitions only
    if self._mode == "overwrite" and partition_by and self._last_partition_values:
        delta_opts = write_opts.setdefault("delta_write_options", {})
        delta_opts.setdefault(
            "predicate", _build_replace_where(self._last_partition_values)
        )

    _logger.info(
        "Writing to %s (mode=%s, partition_by=%s)", uri, self._mode, partition_by
    )

    if isinstance(obj, pl.LazyFrame):
        obj.sink_delta(
            uri,
            mode=self._mode,
            storage_options=self.storage_options,
            **write_opts,
        )
    elif isinstance(obj, pl.DataFrame):
        obj.write_delta(
            uri,
            mode=self._mode,
            storage_options=self.storage_options,
            **write_opts,
        )
    else:
        msg = (
            f"PolarsDeltaIoManager.write() expects a polars.DataFrame, "
            f"polars.LazyFrame, or DeltaMerge, "
            f"got {type(obj).__name__}"
        )
        raise TypeError(msg)

write_with_retry(context, obj)

Write with retry logic.

DeltaMerge and DataFrame/LazyFrame writes are all retried when RetryConfig is configured.

Source code in src/databricks_bundle_decorators/io_managers/polars_delta.py
def write_with_retry(self, context: OutputContext, obj: Any) -> None:
    """Write with retry logic.

    `DeltaMerge` and DataFrame/LazyFrame writes are all retried
    when `RetryConfig` is configured.
    """
    # DeltaMerge — retry-safe path: rebuild merger on each attempt.
    if isinstance(obj, DeltaMerge):
        if self.retry is None:
            self.write(context, obj)
            return

        uri = self._uri(context.asset_name)

        def _execute_merge() -> None:
            merger = obj._build_merger(uri, storage_options=self.storage_options)
            if merger is None:
                obj._initial_write(
                    uri,
                    storage_options=self.storage_options,
                    partition_by=context.partition_by,
                    write_options=dict(self._write_options),
                )
            else:
                merger.execute()

        wait_kwargs: dict[str, Any] = {
            "multiplier": self.retry.delay,
            "exp_base": self.retry.backoff_factor,
        }
        if self.retry.max_delay is not None:
            wait_kwargs["max"] = self.retry.max_delay

        retryer = retry(
            stop=stop_after_attempt(self.retry.max_attempts),
            wait=wait_exponential(**wait_kwargs),
            reraise=True,
            before_sleep=before_sleep_log(_logger, logging.WARNING),
        )
        retryer(_execute_merge)()
        if context.partition_by:
            self._last_partition_values = _polars_extract_partition_values(
                obj.source, context.partition_by
            )
        else:
            self._last_partition_values = {}
        return

    super().write_with_retry(context, obj)

read(context)

Read a Delta table as a LazyFrame or DataFrame.

If the downstream parameter is annotated as polars.DataFrame, returns read_delta (eager). Otherwise returns scan_delta (lazy polars.LazyFrame) — this is the default for unannotated parameters.

When partition_by includes "backfill_key", reads are filtered to the current partition unless the upstream dependency uses all_partitions() or the consuming task uses @task(all_partitions=True).

Source code in src/databricks_bundle_decorators/io_managers/polars_delta.py
def read(self, context: InputContext) -> Any:
    """Read a Delta table as a LazyFrame or DataFrame.

    If the downstream parameter is annotated as `polars.DataFrame`,
    returns ``read_delta`` (eager).  Otherwise returns ``scan_delta``
    (lazy `polars.LazyFrame`) — this is the default for
    unannotated parameters.

    When ``partition_by`` includes ``"backfill_key"``, reads are
    filtered to the current partition unless the upstream
    dependency uses `all_partitions()` or the consuming
    task uses ``@task(all_partitions=True)``.
    """
    import polars as pl  # noqa: PLC0415

    uri = self._uri(context.upstream_asset_name)
    _logger.info(
        "Reading from %s (partition_filter=%s)", uri, context.partition_filter
    )

    if context.expected_type is pl.DataFrame:
        result = pl.read_delta(
            uri, storage_options=self.storage_options, **self._read_options
        )
    else:
        result = pl.scan_delta(
            uri, storage_options=self.storage_options, **self._read_options
        )

    if context.partition_filter and not context.all_partitions:
        result = _polars_apply_partition_filter(result, context.partition_filter)
    elif (
        self.auto_filter
        and _needs_backfill_key_col(context.partition_by)
        and not context.all_partitions
    ):
        result = result.filter(
            pl.col("backfill_key") == _resolve_backfill_key(context.backfill_key)
        )

    return result

JSON (NDJSON)

databricks_bundle_decorators.io_managers.PolarsJsonIoManager(base_path, storage_options=None, write_options=None, read_options=None, *, auto_filter=True, retry=None)

Bases: IoManager

Persist Polars DataFrames as NDJSON on any cloud or local filesystem.

Uses newline-delimited JSON (NDJSON) format, the standard for streaming data pipelines. This is the only JSON variant in Polars that supports cloud storage (storage_options) and lazy I/O.

Write dispatch:

  • polars.LazyFramesink_ndjson
  • polars.DataFrame.lazy() then sink_ndjson (routed through the lazy path for cloud storage support)

On the read side, the downstream task's parameter type annotation determines the method used. Annotate the parameter as pl.DataFrame to receive an eager read_ndjson; otherwise (including unannotated parameters) a lazy scan_ndjson is used by default.

Parameters:

Name Type Description Default
base_path str | Callable[[], str]

Root URI for NDJSON files. Can be a local path (/tmp/data), an Azure URI (abfss://container@account.dfs.core.windows.net/path), an S3 URI (s3://bucket/prefix), a GCS URI (gs://bucket/prefix), or any other URI scheme that Polars supports.

Can also be a callable that returns a string, resolved lazily at runtime. Use this for multi-environment deployments where the path depends on job parameters::

from databricks_bundle_decorators import params

io = PolarsJsonIoManager(
    base_path=lambda: f"abfss://lake@{params['env']}account.dfs.core.windows.net/data",
)
required
storage_options dict[str, str] | Callable[[], dict[str, str]] | None

Credentials / options forwarded to Polars I/O calls. Can be a plain dict, a callable that returns a dict (resolved lazily on each read/write), or None.

Use a callable to defer credential lookup to runtime::

from databricks_bundle_decorators import get_dbutils


def _storage_options() -> dict[str, str]:
    dbutils = get_dbutils()
    key = dbutils.secrets.get(scope="kv", key="storage-key")
    return {"account_name": "myaccount", "account_key": key}


io = PolarsJsonIoManager(
    base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
    storage_options=_storage_options,
)
None
write_options dict[str, Any] | None

Extra keyword arguments forwarded to the Polars write call (sink_ndjson).

Do not include storage_options here — use the dedicated parameter instead.

None
read_options dict[str, Any] | None

Extra keyword arguments forwarded to the Polars read call (read_ndjson / scan_ndjson).

None
retry `RetryConfig` | None

Optional retry configuration for write operations. When set, failed writes are retried with exponential backoff (powered by tenacity). Useful for handling transient write conflicts during concurrent backfill runs. Defaults to None (no retries).

None
Example

::

from databricks_bundle_decorators.io_managers import PolarsJsonIoManager

io = PolarsJsonIoManager(
    base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
)


@task(io_manager=io)
def extract() -> pl.LazyFrame:  # sink_ndjson on write
    return pl.LazyFrame({"a": [1, 2]})


@task
def transform(df: pl.LazyFrame):  # scan_ndjson on read
    print(df.collect())
Source code in src/databricks_bundle_decorators/io_managers/polars_json.py
def __init__(
    self,
    base_path: str | Callable[[], str],
    storage_options: dict[str, str] | Callable[[], dict[str, str]] | None = None,
    write_options: dict[str, Any] | None = None,
    read_options: dict[str, Any] | None = None,
    *,
    auto_filter: bool = True,
    retry: RetryConfig | None = None,
) -> None:
    self._base_path = base_path
    self._storage_options = storage_options
    self._write_options = write_options or {}
    self._read_options = read_options or {}
    self.auto_filter = auto_filter
    self.retry = retry

base_path property

Resolve base_path, calling it first if it is a callable.

storage_options property

Resolve storage_options, calling it first if it is a callable.

write(context, obj)

Write a Polars DataFrame or LazyFrame as NDJSON.

  • polars.LazyFramesink_ndjson
  • polars.DataFrame.lazy() then sink_ndjson

When partition_by is set on the @task decorator, writes to Hive-style partitioned directories using pl.PartitionBy.

Source code in src/databricks_bundle_decorators/io_managers/polars_json.py
def write(self, context: OutputContext, obj: Any) -> None:
    """Write a Polars DataFrame or LazyFrame as NDJSON.

    - `polars.LazyFrame` → ``sink_ndjson``
    - `polars.DataFrame` → ``.lazy()`` then ``sink_ndjson``

    When ``partition_by`` is set on the ``@task`` decorator, writes
    to Hive-style partitioned directories using
    ``pl.PartitionBy``.
    """
    import polars as pl  # noqa: PLC0415

    base_uri = self._uri(context.asset_name)
    partition_by = context.partition_by
    _logger.info("Writing to %s (partition_by=%s)", base_uri, partition_by)

    # Inject backfill_key column if it's a partition column
    has_bk_col = isinstance(
        obj, (pl.DataFrame, pl.LazyFrame)
    ) and "backfill_key" in (
        obj.collect_schema().names()
        if isinstance(obj, pl.LazyFrame)
        else obj.columns
    )
    if _should_inject_backfill_key(partition_by, has_backfill_key_col=has_bk_col):
        bk = _resolve_backfill_key(context.backfill_key)
        obj = obj.with_columns(pl.lit(bk).alias("backfill_key"))

    if partition_by:
        # Extract partition values from data before writing
        self._last_partition_values = _polars_extract_partition_values(
            obj, partition_by
        )
        if isinstance(obj, pl.LazyFrame):
            obj.sink_ndjson(
                pl.PartitionBy(base_uri, key=partition_by),
                mkdir=True,
                storage_options=self.storage_options,
                **self._write_options,
            )
        elif isinstance(obj, pl.DataFrame):
            obj.lazy().sink_ndjson(
                pl.PartitionBy(base_uri, key=partition_by),
                mkdir=True,
                storage_options=self.storage_options,
                **self._write_options,
            )
        else:
            msg = (
                f"PolarsJsonIoManager.write() expects a polars.DataFrame or "
                f"polars.LazyFrame, got {type(obj).__name__}"
            )
            raise TypeError(msg)
    else:
        uri = f"{base_uri}.ndjson"
        if isinstance(obj, pl.LazyFrame):
            obj.sink_ndjson(
                uri, storage_options=self.storage_options, **self._write_options
            )
        elif isinstance(obj, pl.DataFrame):
            obj.lazy().sink_ndjson(
                uri, storage_options=self.storage_options, **self._write_options
            )
        else:
            msg = (
                f"PolarsJsonIoManager.write() expects a polars.DataFrame or "
                f"polars.LazyFrame, got {type(obj).__name__}"
            )
            raise TypeError(msg)

read(context)

Read NDJSON as a LazyFrame or DataFrame.

If the downstream parameter is annotated as polars.DataFrame, returns read_ndjson (eager). Otherwise returns scan_ndjson (lazy polars.LazyFrame) — this is the default for unannotated parameters.

When partition_by is set, reads from the Hive-partitioned directory. By default only the current backfill_key partition is returned; use all_partitions() on the upstream dependency or @task(all_partitions=True) on the consuming task to read all partitions.

Source code in src/databricks_bundle_decorators/io_managers/polars_json.py
def read(self, context: InputContext) -> Any:
    """Read NDJSON as a LazyFrame or DataFrame.

    If the downstream parameter is annotated as `polars.DataFrame`,
    returns ``read_ndjson`` (eager).  Otherwise returns ``scan_ndjson``
    (lazy `polars.LazyFrame`) — this is the default for
    unannotated parameters.

    When ``partition_by`` is set, reads from the Hive-partitioned
    directory.  By default only the current ``backfill_key``
    partition is returned; use `all_partitions()` on the
    upstream dependency or ``@task(all_partitions=True)`` on
    the consuming task to read all partitions.
    """
    import polars as pl  # noqa: PLC0415

    base_uri = self._uri(context.upstream_asset_name)
    partition_by = context.partition_by
    _logger.info(
        "Reading from %s (partition_filter=%s)", base_uri, context.partition_filter
    )

    if partition_by:
        glob_uri = f"{base_uri}/**/*.jsonl"
        if context.expected_type is pl.DataFrame:
            result = pl.read_ndjson(
                glob_uri,
                storage_options=self.storage_options,
                **self._read_options,
            )
        else:
            result = pl.scan_ndjson(
                glob_uri,
                storage_options=self.storage_options,
                **self._read_options,
            )
        if context.partition_filter and not context.all_partitions:
            result = _polars_apply_partition_filter(
                result, context.partition_filter
            )
        elif (
            self.auto_filter
            and _needs_backfill_key_col(partition_by)
            and not context.all_partitions
        ):
            result = result.filter(
                pl.col("backfill_key")
                == _resolve_backfill_key(context.backfill_key)
            )
        return result

    uri = f"{base_uri}.ndjson"
    if context.expected_type is pl.DataFrame:
        return pl.read_ndjson(
            uri, storage_options=self.storage_options, **self._read_options
        )
    return pl.scan_ndjson(
        uri, storage_options=self.storage_options, **self._read_options
    )

CSV

databricks_bundle_decorators.io_managers.PolarsCsvIoManager(base_path, storage_options=None, write_options=None, read_options=None, *, auto_filter=True, retry=None)

Bases: IoManager

Persist Polars DataFrames as CSV on any cloud or local filesystem.

Write dispatch:

  • polars.LazyFramesink_csv
  • polars.DataFramewrite_csv

On the read side, the downstream task's parameter type annotation determines the method used. Annotate the parameter as pl.DataFrame to receive an eager read_csv; otherwise (including unannotated parameters) a lazy scan_csv is used by default.

Parameters:

Name Type Description Default
base_path str | Callable[[], str]

Root URI for CSV files. Can be a local path (/tmp/data), an Azure URI (abfss://container@account.dfs.core.windows.net/path), an S3 URI (s3://bucket/prefix), a GCS URI (gs://bucket/prefix), or any other URI scheme that Polars supports.

Can also be a callable that returns a string, resolved lazily at runtime. Use this for multi-environment deployments where the path depends on job parameters::

from databricks_bundle_decorators import params

io = PolarsCsvIoManager(
    base_path=lambda: f"abfss://lake@{params['env']}account.dfs.core.windows.net/data",
)
required
storage_options dict[str, str] | Callable[[], dict[str, str]] | None

Credentials / options forwarded to Polars I/O calls. Can be a plain dict, a callable that returns a dict (resolved lazily on each read/write), or None.

Use a callable to defer credential lookup to runtime::

from databricks_bundle_decorators import get_dbutils


def _storage_options() -> dict[str, str]:
    dbutils = get_dbutils()
    key = dbutils.secrets.get(scope="kv", key="storage-key")
    return {"account_name": "myaccount", "account_key": key}


io = PolarsCsvIoManager(
    base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
    storage_options=_storage_options,
)
None
write_options dict[str, Any] | None

Extra keyword arguments forwarded to the Polars write call (write_csv / sink_csv). For example::

{"separator": ";", "quote_char": '"'}

Do not include storage_options here — use the dedicated parameter instead.

None
read_options dict[str, Any] | None

Extra keyword arguments forwarded to the Polars read call (read_csv / scan_csv).

None
retry `RetryConfig` | None

Optional retry configuration for write operations. When set, failed writes are retried with exponential backoff (powered by tenacity). Useful for handling transient write conflicts during concurrent backfill runs. Defaults to None (no retries).

None
Example

::

from databricks_bundle_decorators.io_managers import PolarsCsvIoManager

io = PolarsCsvIoManager(
    base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
)


@task(io_manager=io)
def extract() -> pl.LazyFrame:  # sink_csv on write
    return pl.LazyFrame({"a": [1, 2]})


@task
def transform(df: pl.LazyFrame):  # scan_csv on read
    print(df.collect())
Source code in src/databricks_bundle_decorators/io_managers/polars_csv.py
def __init__(
    self,
    base_path: str | Callable[[], str],
    storage_options: dict[str, str] | Callable[[], dict[str, str]] | None = None,
    write_options: dict[str, Any] | None = None,
    read_options: dict[str, Any] | None = None,
    *,
    auto_filter: bool = True,
    retry: RetryConfig | None = None,
) -> None:
    self._base_path = base_path
    self._storage_options = storage_options
    self._write_options = write_options or {}
    self._read_options = read_options or {}
    self.auto_filter = auto_filter
    self.retry = retry

base_path property

Resolve base_path, calling it first if it is a callable.

storage_options property

Resolve storage_options, calling it first if it is a callable.

write(context, obj)

Write a Polars DataFrame or LazyFrame as CSV.

  • polars.LazyFramesink_csv
  • polars.DataFramewrite_csv (single file) or .lazy().sink_csv(PartitionBy) (partitioned)

When partition_by is set on the @task decorator, writes to Hive-style partitioned directories using pl.PartitionBy.

Source code in src/databricks_bundle_decorators/io_managers/polars_csv.py
def write(self, context: OutputContext, obj: Any) -> None:
    """Write a Polars DataFrame or LazyFrame as CSV.

    - `polars.LazyFrame` → ``sink_csv``
    - `polars.DataFrame` → ``write_csv`` (single file) or
      ``.lazy().sink_csv(PartitionBy)`` (partitioned)

    When ``partition_by`` is set on the ``@task`` decorator, writes
    to Hive-style partitioned directories using
    ``pl.PartitionBy``.
    """
    import polars as pl  # noqa: PLC0415

    base_uri = self._uri(context.asset_name)
    partition_by = context.partition_by
    _logger.info("Writing to %s (partition_by=%s)", base_uri, partition_by)

    # Inject backfill_key column if it's a partition column
    has_bk_col = isinstance(
        obj, (pl.DataFrame, pl.LazyFrame)
    ) and "backfill_key" in (
        obj.collect_schema().names()
        if isinstance(obj, pl.LazyFrame)
        else obj.columns
    )
    if _should_inject_backfill_key(partition_by, has_backfill_key_col=has_bk_col):
        bk = _resolve_backfill_key(context.backfill_key)
        obj = obj.with_columns(pl.lit(bk).alias("backfill_key"))

    if partition_by:
        # Extract partition values from data before writing
        self._last_partition_values = _polars_extract_partition_values(
            obj, partition_by
        )
        if isinstance(obj, pl.LazyFrame):
            obj.sink_csv(
                pl.PartitionBy(base_uri, key=partition_by),
                mkdir=True,
                storage_options=self.storage_options,
                **self._write_options,
            )
        elif isinstance(obj, pl.DataFrame):
            obj.lazy().sink_csv(
                pl.PartitionBy(base_uri, key=partition_by),
                mkdir=True,
                storage_options=self.storage_options,
                **self._write_options,
            )
        else:
            msg = (
                f"PolarsCsvIoManager.write() expects a polars.DataFrame or "
                f"polars.LazyFrame, got {type(obj).__name__}"
            )
            raise TypeError(msg)
    else:
        uri = f"{base_uri}.csv"
        if isinstance(obj, pl.LazyFrame):
            obj.sink_csv(
                uri, storage_options=self.storage_options, **self._write_options
            )
        elif isinstance(obj, pl.DataFrame):
            obj.write_csv(
                uri, storage_options=self.storage_options, **self._write_options
            )
        else:
            msg = (
                f"PolarsCsvIoManager.write() expects a polars.DataFrame or "
                f"polars.LazyFrame, got {type(obj).__name__}"
            )
            raise TypeError(msg)

read(context)

Read CSV as a LazyFrame or DataFrame.

If the downstream parameter is annotated as polars.DataFrame, returns read_csv (eager). Otherwise returns scan_csv (lazy polars.LazyFrame) — this is the default for unannotated parameters.

When partition_by is set, reads from the Hive-partitioned directory. By default only the current backfill_key partition is returned; use all_partitions() on the upstream dependency or @task(all_partitions=True) on the consuming task to read all partitions.

Source code in src/databricks_bundle_decorators/io_managers/polars_csv.py
def read(self, context: InputContext) -> Any:
    """Read CSV as a LazyFrame or DataFrame.

    If the downstream parameter is annotated as `polars.DataFrame`,
    returns ``read_csv`` (eager).  Otherwise returns ``scan_csv``
    (lazy `polars.LazyFrame`) — this is the default for
    unannotated parameters.

    When ``partition_by`` is set, reads from the Hive-partitioned
    directory.  By default only the current ``backfill_key``
    partition is returned; use `all_partitions()` on the
    upstream dependency or ``@task(all_partitions=True)`` on
    the consuming task to read all partitions.
    """
    import polars as pl  # noqa: PLC0415

    base_uri = self._uri(context.upstream_asset_name)
    partition_by = context.partition_by
    _logger.info(
        "Reading from %s (partition_filter=%s)", base_uri, context.partition_filter
    )

    if partition_by:
        glob_uri = f"{base_uri}/**/*.csv"
        if context.expected_type is pl.DataFrame:
            result = pl.read_csv(
                glob_uri,
                storage_options=self.storage_options,
                **self._read_options,
            )
        else:
            result = pl.scan_csv(
                glob_uri,
                storage_options=self.storage_options,
                **self._read_options,
            )
        if context.partition_filter and not context.all_partitions:
            result = _polars_apply_partition_filter(
                result, context.partition_filter
            )
        elif (
            self.auto_filter
            and _needs_backfill_key_col(partition_by)
            and not context.all_partitions
        ):
            result = result.filter(
                pl.col("backfill_key")
                == _resolve_backfill_key(context.backfill_key)
            )
        return result

    uri = f"{base_uri}.csv"
    if context.expected_type is pl.DataFrame:
        return pl.read_csv(
            uri, storage_options=self.storage_options, **self._read_options
        )
    return pl.scan_csv(
        uri, storage_options=self.storage_options, **self._read_options
    )