Custom IoManagers¶
Subclass IoManager to implement your own storage backend when the
built-in IoManagers don't cover your use case
(e.g. writing to a REST API, a message queue, or a custom file format).
Partitioning support¶
When building a custom IoManager, use context.backfill_key to scope
storage to the current partition, and context.all_partitions to
support cross-partition reads (triggered by the all_partitions()
wrapper or @task(all_partitions=True)).
Auto-filtering via task values¶
By default (auto_filter=True), the runtime pushes the distinct
partition values written by the producer to downstream consumers via
Databricks task values. On the read side, these values are available
in context.partition_filter — a dict[str, list[str]] mapping
column names to the values that were written.
To opt into auto-filtering in a custom IoManager:
- Accept
auto_filterin your__init__and setself.auto_filter. - Override
_extract_partition_valuesto return the distinct values for each partition column after a write. - In
read(), checkcontext.partition_filterand apply it.
from databricks_bundle_decorators import IoManager, OutputContext, InputContext
class MyIoManager(IoManager):
def __init__(self, base_path: str, *, auto_filter: bool = True) -> None:
self.base_path = base_path
self.auto_filter = auto_filter
def write(self, context: OutputContext, obj):
path = f"/data/{context.task_key}"
if context.backfill_key:
path = f"{path}/backfill_key={context.backfill_key}"
save(path, obj)
def _extract_partition_values(
self, context: OutputContext
) -> dict[str, list[str]]:
path = f"/data/{context.task_key}"
return extract_distinct_values(path, context.partition_by)
def read(self, context: InputContext):
path = f"/data/{context.upstream_task_key}"
if context.all_partitions:
return load_all(path) # Read all partition directories
if context.partition_filter:
return load_filtered(path, context.partition_filter)
if context.backfill_key:
path = f"{path}/backfill_key={context.backfill_key}"
return load(path)
If you set auto_filter=False, context.partition_filter will
always be None and only the backfill_key fallback applies.
Delta replaceWhere example¶
All built-in Delta IoManagers automatically apply replaceWhere (Spark)
or predicate (delta-rs / Polars) when mode="overwrite" is combined
with partition_by — see Partition-scoped overwrite.
If you are writing a custom Delta IoManager, you should apply the same pattern to avoid destroying data in other partitions during backfill runs:
from databricks_bundle_decorators import IoManager, OutputContext, InputContext
from pyspark.sql import SparkSession, functions as F
class DeltaReplaceWhereIoManager(IoManager):
"""Write to one Delta table, overwriting only the current partition."""
def __init__(self, base_path: str) -> None:
self.base_path = base_path
def write(self, context: OutputContext, obj) -> None:
uri = f"{self.base_path}/{context.task_key}"
bk = context.backfill_key or "unknown"
obj = obj.withColumn("backfill_key", F.lit(bk))
(
obj.write.format("delta")
.mode("overwrite")
.option("replaceWhere", f"backfill_key = '{bk}'")
.partitionBy("backfill_key")
.save(uri)
)
def read(self, context: InputContext):
spark = SparkSession.getActiveSession()
uri = f"{self.base_path}/{context.upstream_task_key}"
return spark.read.format("delta").load(uri)
API Reference¶
databricks_bundle_decorators.io_manager.IoManager
¶
Bases: ABC
Base class for managing data transfer between tasks.
Each @task can optionally declare an IoManager that controls how
its return value is persisted and how downstream tasks read that data.
Lifecycle
IoManager instances are created at import time during both deploy
and runtime phases. __init__ must therefore be safe to run locally
without a Databricks runtime — do not import modules like
pyspark.dbutils or establish cluster-only connections there.
Instead, override setup for any initialisation that requires a
Databricks runtime environment. The framework calls setup()
exactly once per instance, at runtime only, before the first
read or write invocation.
Example
::
import polars as pl
from databricks_bundle_decorators import IoManager, OutputContext, InputContext
class DeltaIoManager(IoManager):
def __init__(self, catalog: str, schema: str):
self.catalog = catalog
self.schema = schema
def setup(self) -> None:
# Safe here — only called at runtime on Databricks.
from pyspark.dbutils import DBUtils # noqa: F401
self.dbutils = DBUtils(...)
def write(self, context: OutputContext, obj: Any) -> None:
table = f"{self.catalog}.{self.schema}.{context.task_key}"
obj.write_delta(table, mode="overwrite")
def read(self, context: InputContext) -> Any:
table = f"{self.catalog}.{self.schema}.{context.upstream_task_key}"
return pl.read_delta(table)
auto_filter = True
class-attribute
instance-attribute
¶
When True, partition values are pushed via task values on write and used to auto-filter reads. Set to False to disable.
setup()
¶
Initialise runtime-only resources.
Override this method to perform initialisation that requires a
Databricks cluster environment (Spark sessions, DBUtils, secret
scopes, etc.). The framework guarantees this is called once
before the first read or write, and only at
runtime — never during databricks bundle deploy.
The default implementation does nothing.
Source code in src/databricks_bundle_decorators/io_manager.py
write(context, obj)
abstractmethod
¶
databricks_bundle_decorators.io_manager.OutputContext(job_name, task_key, run_id, backfill_key=None, partition_by=None)
dataclass
¶
Context provided to IoManager.write when persisting a task's return value.
databricks_bundle_decorators.io_manager.InputContext(job_name, task_key, upstream_task_key, run_id, expected_type=None, backfill_key=None, all_partitions=False, partition_by=None, partition_filter=None)
dataclass
¶
Context provided to IoManager.read when retrieving upstream output.
Attributes:
| Name | Type | Description |
|---|---|---|
expected_type |
type | None
|
The type annotation of the downstream task's parameter, if available.
IoManagers can use this to return the appropriate type (e.g.
|
backfill_key |
str | None
|
The backfill key of the current run. IoManagers use this to scope reads to the correct partition. |
all_partitions |
bool
|
When True, the IoManager should read all partitions instead
of filtering to the current |
partition_filter |
dict[str, list[str]] | None
|
Mapping of partition column names to their written values,
pushed by the producing task via task values. When set, the
IoManager uses these values to filter the read result. Populated
automatically when |