Custom IoManagers¶
Subclass IoManager to implement your own storage backend when the
built-in IoManagers don't cover your use case
(e.g. writing to a REST API, a message queue, or a custom file format).
Asset naming¶
Use context.asset_name (write) and context.upstream_asset_name
(read) to derive storage paths. These return output_name when set
via @task(output_name="..."), falling back to the task key otherwise.
def write(self, context: OutputContext, obj) -> None:
path = f"{self.base_path}/{context.asset_name}"
save(path, obj)
def read(self, context: InputContext):
path = f"{self.base_path}/{context.upstream_asset_name}"
return load(path)
Tip
Prefer asset_name / upstream_asset_name over task_key /
upstream_task_key when building storage paths.
Partitioning support¶
When building a custom IoManager, use context.backfill_key to scope
storage to the current partition, and context.all_partitions to
support cross-partition reads (triggered by the all_partitions()
wrapper or @task(all_partitions=True)).
Auto-filtering via task values¶
By default (auto_filter=True), the runtime pushes the distinct
partition values written by the producer to downstream consumers via
Databricks task values. On the read side, these values are available
in context.partition_filter — a dict[str, list[str]] mapping
column names to the values that were written.
To opt into auto-filtering in a custom IoManager:
- Accept
auto_filterin your__init__and setself.auto_filter. - Override
_extract_partition_valuesto return the distinct values for each partition column after a write. - In
read(), checkcontext.partition_filterand apply it.
from databricks_bundle_decorators import IoManager, OutputContext, InputContext
class MyIoManager(IoManager):
def __init__(self, base_path: str, *, auto_filter: bool = True) -> None:
self.base_path = base_path
self.auto_filter = auto_filter
def write(self, context: OutputContext, obj):
path = f"/data/{context.task_key}"
if context.backfill_key:
path = f"{path}/backfill_key={context.backfill_key}"
save(path, obj)
def _extract_partition_values(
self, context: OutputContext
) -> dict[str, list[str]]:
path = f"/data/{context.task_key}"
return extract_distinct_values(path, context.partition_by)
def read(self, context: InputContext):
path = f"/data/{context.upstream_task_key}"
if context.all_partitions:
return load_all(path) # Read all partition directories
if context.partition_filter:
return load_filtered(path, context.partition_filter)
if context.backfill_key:
path = f"{path}/backfill_key={context.backfill_key}"
return load(path)
If you set auto_filter=False, context.partition_filter will
always be None and only the backfill_key fallback applies.
Delta replaceWhere example¶
All built-in Delta IoManagers automatically apply replaceWhere (Spark)
or predicate (delta-rs / Polars) when mode="overwrite" is combined
with partition_by — see Partition-scoped overwrite.
If you are writing a custom Delta IoManager, you should apply the same pattern to avoid destroying data in other partitions during backfill runs:
from databricks_bundle_decorators import IoManager, OutputContext, InputContext
from pyspark.sql import SparkSession, functions as F
class DeltaReplaceWhereIoManager(IoManager):
"""Write to one Delta table, overwriting only the current partition."""
def __init__(self, base_path: str) -> None:
self.base_path = base_path
def write(self, context: OutputContext, obj) -> None:
uri = f"{self.base_path}/{context.task_key}"
bk = context.backfill_key or "unknown"
obj = obj.withColumn("backfill_key", F.lit(bk))
(
obj.write.format("delta")
.mode("overwrite")
.option("replaceWhere", f"backfill_key = '{bk}'")
.partitionBy("backfill_key")
.save(uri)
)
def read(self, context: InputContext):
spark = SparkSession.getActiveSession()
uri = f"{self.base_path}/{context.upstream_task_key}"
return spark.read.format("delta").load(uri)
Write retries¶
To handle transient or concurrent-write errors, configure RetryConfig on your IoManager. The framework will retry failed writes with exponential backoff (powered
by tenacity):
from databricks_bundle_decorators import IoManager, RetryConfig, OutputContext, InputContext
class MyIoManager(IoManager):
def __init__(self, base_path: str) -> None:
self.base_path = base_path
self.retry = RetryConfig(max_attempts=5, delay=1.0, backoff_factor=2.0)
def write(self, context: OutputContext, obj) -> None:
...
def read(self, context: InputContext):
...
RetryConfig parameters:
| Parameter | Default | Description |
|---|---|---|
max_attempts |
3 |
Total number of attempts (including the first try) |
delay |
1.0 |
Initial delay in seconds between retries |
backoff_factor |
2.0 |
Multiplier applied to delay after each failure (1s → 2s → 4s …) |
API Reference¶
databricks_bundle_decorators.io_manager.IoManager
¶
Bases: ABC
Base class for managing data transfer between tasks.
Each @task can optionally declare an IoManager that controls how
its return value is persisted and how downstream tasks read that data.
Lifecycle
IoManager instances are created at import time during both deploy
and runtime phases. __init__ must therefore be safe to run locally
without a Databricks runtime — do not import modules like
pyspark.dbutils or establish cluster-only connections there.
Instead, override setup for any initialisation that requires a
Databricks runtime environment. The framework calls setup()
exactly once per instance, at runtime only, before the first
read or write invocation.
Example
::
import polars as pl
from databricks_bundle_decorators import IoManager, OutputContext, InputContext
class DeltaIoManager(IoManager):
def __init__(self, catalog: str, schema: str):
self.catalog = catalog
self.schema = schema
def setup(self) -> None:
# Safe here — only called at runtime on Databricks.
from pyspark.dbutils import DBUtils # noqa: F401
self.dbutils = DBUtils(...)
def write(self, context: OutputContext, obj: Any) -> None:
table = f"{self.catalog}.{self.schema}.{context.task_key}"
obj.write_delta(table, mode="overwrite")
def read(self, context: InputContext) -> Any:
table = f"{self.catalog}.{self.schema}.{context.upstream_task_key}"
return pl.read_delta(table)
auto_filter = True
class-attribute
instance-attribute
¶
When True, partition values are pushed via task values on write and used to auto-filter reads. Set to False to disable.
retry = None
class-attribute
instance-attribute
¶
Optional retry configuration for write operations.
When set, write calls that raise exceptions will be retried
with exponential backoff. Useful for handling transient Delta
commit conflicts during concurrent backfill runs.
setup()
¶
Initialise runtime-only resources.
Override this method to perform initialisation that requires a
Databricks cluster environment (Spark sessions, DBUtils, secret
scopes, etc.). The framework guarantees this is called once
before the first read or write, and only at
runtime — never during databricks bundle deploy.
The default implementation does nothing.
Source code in src/databricks_bundle_decorators/io_manager.py
write_with_retry(context, obj)
¶
Call write with optional retry logic.
If retry is configured, retries on any exception using
exponential backoff (powered by tenacity). Otherwise, calls
write directly.
Source code in src/databricks_bundle_decorators/io_manager.py
write(context, obj)
abstractmethod
¶
databricks_bundle_decorators.io_manager.OutputContext(job_name, task_key, run_id, output_name=None, backfill_key=None, partition_by=None)
dataclass
¶
Context provided to IoManager.write when persisting a task's return value.
asset_name
property
¶
The resolved name for the output asset (table, path segment, etc.).
Returns output_name if set on the @task decorator,
otherwise falls back to task_key.
databricks_bundle_decorators.io_manager.InputContext(job_name, task_key, upstream_task_key, run_id, upstream_output_name=None, expected_type=None, backfill_key=None, all_partitions=False, partition_by=None, partition_filter=None)
dataclass
¶
Context provided to IoManager.read when retrieving upstream output.
Attributes:
| Name | Type | Description |
|---|---|---|
expected_type |
type | None
|
The type annotation of the downstream task's parameter, if available.
IoManagers can use this to return the appropriate type (e.g.
|
backfill_key |
str | None
|
The backfill key of the current run. IoManagers use this to scope reads to the correct partition. |
all_partitions |
bool
|
When True, the IoManager should read all partitions instead
of filtering to the current |
partition_filter |
dict[str, list[str]] | None
|
Mapping of partition column names to their written values,
pushed by the producing task via task values. When set, the
IoManager uses these values to filter the read result. Populated
automatically when |
upstream_asset_name
property
¶
The resolved name for the upstream output asset.
Returns the upstream task's output_name if set,
otherwise falls back to upstream_task_key.
databricks_bundle_decorators.io_manager.RetryConfig(max_attempts=3, delay=1.0, backoff_factor=2.0, max_delay=None)
dataclass
¶
Configuration for retrying IoManager write operations.
Useful when concurrent writes to the same Delta table cause
transient CommitFailedError conflicts (e.g. during backfills
of unpartitioned dimension tables).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_attempts
|
int
|
Total number of attempts (including the first try). Must be >= 1. |
3
|
delay
|
float
|
Initial delay in seconds between retry attempts. |
1.0
|
backoff_factor
|
float
|
Multiplier applied to delay after each failed attempt.
For example, |
2.0
|
max_delay
|
float | None
|
Maximum delay in seconds between retry attempts. Caps the
exponential growth so that individual waits never exceed this
value. |
None
|