Skip to content

Custom IoManagers

Subclass IoManager to implement your own storage backend when the built-in IoManagers don't cover your use case (e.g. writing to a REST API, a message queue, or a custom file format).

Partitioning support

When building a custom IoManager, use context.backfill_key to scope storage to the current partition, and context.all_partitions to support cross-partition reads (triggered by the all_partitions() wrapper or @task(all_partitions=True)).

Auto-filtering via task values

By default (auto_filter=True), the runtime pushes the distinct partition values written by the producer to downstream consumers via Databricks task values. On the read side, these values are available in context.partition_filter — a dict[str, list[str]] mapping column names to the values that were written.

To opt into auto-filtering in a custom IoManager:

  1. Accept auto_filter in your __init__ and set self.auto_filter.
  2. Override _extract_partition_values to return the distinct values for each partition column after a write.
  3. In read(), check context.partition_filter and apply it.
from databricks_bundle_decorators import IoManager, OutputContext, InputContext

class MyIoManager(IoManager):
    def __init__(self, base_path: str, *, auto_filter: bool = True) -> None:
        self.base_path = base_path
        self.auto_filter = auto_filter

    def write(self, context: OutputContext, obj):
        path = f"/data/{context.task_key}"
        if context.backfill_key:
            path = f"{path}/backfill_key={context.backfill_key}"
        save(path, obj)

    def _extract_partition_values(
        self, context: OutputContext
    ) -> dict[str, list[str]]:
        path = f"/data/{context.task_key}"
        return extract_distinct_values(path, context.partition_by)

    def read(self, context: InputContext):
        path = f"/data/{context.upstream_task_key}"
        if context.all_partitions:
            return load_all(path)  # Read all partition directories
        if context.partition_filter:
            return load_filtered(path, context.partition_filter)
        if context.backfill_key:
            path = f"{path}/backfill_key={context.backfill_key}"
        return load(path)

If you set auto_filter=False, context.partition_filter will always be None and only the backfill_key fallback applies.

Delta replaceWhere example

All built-in Delta IoManagers automatically apply replaceWhere (Spark) or predicate (delta-rs / Polars) when mode="overwrite" is combined with partition_by — see Partition-scoped overwrite.

If you are writing a custom Delta IoManager, you should apply the same pattern to avoid destroying data in other partitions during backfill runs:

from databricks_bundle_decorators import IoManager, OutputContext, InputContext
from pyspark.sql import SparkSession, functions as F


class DeltaReplaceWhereIoManager(IoManager):
    """Write to one Delta table, overwriting only the current partition."""

    def __init__(self, base_path: str) -> None:
        self.base_path = base_path

    def write(self, context: OutputContext, obj) -> None:
        uri = f"{self.base_path}/{context.task_key}"
        bk = context.backfill_key or "unknown"
        obj = obj.withColumn("backfill_key", F.lit(bk))
        (
            obj.write.format("delta")
            .mode("overwrite")
            .option("replaceWhere", f"backfill_key = '{bk}'")
            .partitionBy("backfill_key")
            .save(uri)
        )

    def read(self, context: InputContext):
        spark = SparkSession.getActiveSession()
        uri = f"{self.base_path}/{context.upstream_task_key}"
        return spark.read.format("delta").load(uri)

API Reference

databricks_bundle_decorators.io_manager.IoManager

Bases: ABC

Base class for managing data transfer between tasks.

Each @task can optionally declare an IoManager that controls how its return value is persisted and how downstream tasks read that data.

Lifecycle

IoManager instances are created at import time during both deploy and runtime phases. __init__ must therefore be safe to run locally without a Databricks runtime — do not import modules like pyspark.dbutils or establish cluster-only connections there.

Instead, override setup for any initialisation that requires a Databricks runtime environment. The framework calls setup() exactly once per instance, at runtime only, before the first read or write invocation.

Example

::

import polars as pl
from databricks_bundle_decorators import IoManager, OutputContext, InputContext

class DeltaIoManager(IoManager):
    def __init__(self, catalog: str, schema: str):
        self.catalog = catalog
        self.schema = schema

    def setup(self) -> None:
        # Safe here — only called at runtime on Databricks.
        from pyspark.dbutils import DBUtils          # noqa: F401
        self.dbutils = DBUtils(...)

    def write(self, context: OutputContext, obj: Any) -> None:
        table = f"{self.catalog}.{self.schema}.{context.task_key}"
        obj.write_delta(table, mode="overwrite")

    def read(self, context: InputContext) -> Any:
        table = f"{self.catalog}.{self.schema}.{context.upstream_task_key}"
        return pl.read_delta(table)

auto_filter = True class-attribute instance-attribute

When True, partition values are pushed via task values on write and used to auto-filter reads. Set to False to disable.

setup()

Initialise runtime-only resources.

Override this method to perform initialisation that requires a Databricks cluster environment (Spark sessions, DBUtils, secret scopes, etc.). The framework guarantees this is called once before the first read or write, and only at runtime — never during databricks bundle deploy.

The default implementation does nothing.

Source code in src/databricks_bundle_decorators/io_manager.py
def setup(self) -> None:
    """Initialise runtime-only resources.

    Override this method to perform initialisation that requires a
    Databricks cluster environment (Spark sessions, DBUtils, secret
    scopes, etc.).  The framework guarantees this is called **once**
    before the first `read` or `write`, and **only at
    runtime** — never during ``databricks bundle deploy``.

    The default implementation does nothing.
    """

write(context, obj) abstractmethod

Persist the return value of a task.

Source code in src/databricks_bundle_decorators/io_manager.py
@abstractmethod
def write(self, context: OutputContext, obj: Any) -> None:
    """Persist the return value of a task."""
    ...

read(context) abstractmethod

Read the output of an upstream task for use downstream.

Source code in src/databricks_bundle_decorators/io_manager.py
@abstractmethod
def read(self, context: InputContext) -> Any:
    """Read the output of an upstream task for use downstream."""
    ...

databricks_bundle_decorators.io_manager.OutputContext(job_name, task_key, run_id, backfill_key=None, partition_by=None) dataclass

Context provided to IoManager.write when persisting a task's return value.

databricks_bundle_decorators.io_manager.InputContext(job_name, task_key, upstream_task_key, run_id, expected_type=None, backfill_key=None, all_partitions=False, partition_by=None, partition_filter=None) dataclass

Context provided to IoManager.read when retrieving upstream output.

Attributes:

Name Type Description
expected_type type | None

The type annotation of the downstream task's parameter, if available. IoManagers can use this to return the appropriate type (e.g. polars.LazyFrame vs polars.DataFrame).

backfill_key str | None

The backfill key of the current run. IoManagers use this to scope reads to the correct partition.

all_partitions bool

When True, the IoManager should read all partitions instead of filtering to the current backfill_key. Set when the upstream dependency is wrapped with all_partitions() or when the consuming task uses @task(all_partitions=True).

partition_filter dict[str, list[str]] | None

Mapping of partition column names to their written values, pushed by the producing task via task values. When set, the IoManager uses these values to filter the read result. Populated automatically when auto_filter=True on the producing IoManager.