Built-in IoManagers¶
Ready-to-use IoManager implementations for common data formats and
compute types. Pick one, pass it to @task(io_manager=...), and the
framework handles reading and writing data between tasks automatically.
Choose an IoManager based on your compute type and preferred data format.
| Compute | Format | IoManager |
|---|---|---|
| Polars | Parquet | PolarsParquetIoManager |
| Delta | PolarsDeltaIoManager |
|
| JSON (NDJSON) | PolarsJsonIoManager |
|
| CSV | PolarsCsvIoManager |
|
| Spark – Classic | Delta | SparkDeltaIoManager |
| Parquet | SparkParquetIoManager |
|
| Spark – Serverless | Delta | SparkServerlessDeltaIoManager |
| Parquet | SparkServerlessParquetIoManager |
|
| Spark – Unity Catalog | Managed Tables | SparkUCTableIoManager |
| Volume – Delta | SparkUCVolumeDeltaIoManager |
|
| Volume – Parquet | SparkUCVolumeParquetIoManager |
Delta Write Modes & Merge¶
All Delta IoManagers accept a mode parameter that controls how data
is written. The default is "error", which raises if the table
already exists — this prevents accidental overwrites. Set it
explicitly when you want a different behaviour:
from databricks_bundle_decorators.io_managers import PolarsDeltaIoManager
io_append = PolarsDeltaIoManager(
base_path="abfss://lake@account.dfs.core.windows.net/staging",
mode="overwrite", # or "append", "ignore"
)
For merge / upsert operations, return a merge builder from your
task instead of a DataFrame. The IoManager detects the type and
calls .execute() automatically:
from deltalake import DeltaTable
@task(io_manager=io)
def upsert(new_data: pl.DataFrame):
dt = DeltaTable(io._uri("upsert"))
return (
dt.merge(
source=new_data,
predicate="t.id = s.id",
source_alias="s",
target_alias="t",
)
.when_matched_update_all()
.when_not_matched_insert_all()
) # returns TableMerger — IoManager calls .execute()
from delta.tables import DeltaTable
@task(io_manager=io)
def upsert(new_data):
spark = SparkSession.getActiveSession()
dt = DeltaTable.forPath(spark, io._uri("upsert"))
return (
dt.alias("t")
.merge(new_data.alias("s"), "t.id = s.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
) # returns DeltaMergeBuilder — IoManager calls .execute()
| Mode | Behaviour |
|---|---|
"error" (default) |
Raise if the target already exists |
"overwrite" |
Replace the target completely |
"append" |
Add rows to the existing target |
"ignore" |
Silently skip if the target already exists |
| (merge builder) | Return a TableMerger / DeltaMergeBuilder — mode is ignored |