Spark – Unity Catalog¶
Unity Catalog IoManagers work on both classic and serverless compute. UC handles authentication and access control, so no credential injection is needed.
Partitioning¶
All UC IoManagers support partition_by via the @task decorator.
"backfill_key" is auto-injected on write and auto-filtered on read.
Managed tables use partitionBy() with saveAsTable(); volume paths
use partitionBy() with save().
io = SparkUCTableIoManager(
catalog="main",
schema="staging",
mode="overwrite",
)
@task(io_manager=io, partition_by="backfill_key")
def extract(): ...
Managed Tables¶
Merge / Upsert
mode="merge" is not a valid write mode and will raise a ValueError.
To perform merge/upsert operations, return a DeltaMerge from your task
function.
See Delta Write Modes & Merge for
full examples.
This applies to all Delta-backed UC IoManagers: SparkUCTableIoManager,
SparkUCVolumeDeltaIoManager.
databricks_bundle_decorators.io_managers.SparkUCTableIoManager(catalog, schema, write_options=None, read_options=None, mode='error', *, location=None, auto_filter=True, retry=None)
¶
Bases: IoManager
Persist PySpark DataFrames as Unity Catalog Delta tables.
Uses saveAsTable / spark.table() with the three-level
namespace catalog.schema.name.
By default creates managed tables (Unity Catalog controls
storage). Set location to create external tables at a
user-specified storage path.
The table name defaults to the task key but can be overridden
per-task via @task(output_name="...").
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
catalog
|
str
|
Unity Catalog catalog name (e.g. |
required |
schema
|
str
|
Unity Catalog schema (database) name (e.g. |
required |
location
|
str | None
|
Base storage path for external tables. When set, each table
is stored at |
None
|
write_options
|
dict[str, str] | None
|
Extra Spark writer options applied via |
None
|
read_options
|
dict[str, str] | None
|
Extra Spark reader options applied via |
None
|
mode
|
str
|
Delta write mode ( |
'error'
|
retry
|
`RetryConfig` | None
|
Optional retry configuration for write operations. When set,
failed writes are retried with exponential backoff (powered by
|
None
|
Example
Managed table (default)::
from databricks_bundle_decorators.io_managers import SparkUCTableIoManager
io = SparkUCTableIoManager(catalog="main", schema="staging")
@task(io_manager=io)
def extract():
spark = SparkSession.getActiveSession()
return spark.range(10)
@task
def transform(df): # spark.table("main.staging.extract")
df.show()
External table with custom name::
io = SparkUCTableIoManager(
catalog="main",
schema="bronze",
location="s3://my-bucket/delta",
)
@task(io_manager=io, output_name="customers")
def extract_customers(): # table: main.bronze.customers
... # path: s3://my-bucket/delta/customers
Source code in src/databricks_bundle_decorators/io_managers/spark_uc.py
setup()
¶
Obtain the active SparkSession.
Source code in src/databricks_bundle_decorators/io_managers/spark_uc.py
write(context, obj)
¶
Write a PySpark DataFrame or DeltaMerge.
- If obj is a
DeltaMerge, builds and executes a merge. - Otherwise writes via
saveAsTablewith the configuredmode,partition_by, andwrite_options.
When partition_by includes "backfill_key", the column
is injected automatically from the context.
Source code in src/databricks_bundle_decorators/io_managers/spark_uc.py
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 | |
read(context)
¶
Read a Unity Catalog table as a PySpark DataFrame.
Works for both managed and external tables — the read path
uses spark.table() which resolves via the UC catalog
regardless of storage location.
When partition_by includes "backfill_key", reads are
filtered to the current partition unless the upstream
dependency uses all_partitions() or the consuming
task uses @task(all_partitions=True).
Source code in src/databricks_bundle_decorators/io_managers/spark_uc.py
External Tables¶
Set location to create external tables backed by storage you control:
io = SparkUCTableIoManager(
catalog="main",
schema="bronze",
location="s3://my-bucket/delta",
)
@task(io_manager=io, output_name="customers")
def extract_customers():
... # table: main.bronze.customers
# path: s3://my-bucket/delta/customers
The path must be registered as a UC
external location.
Reads use spark.table() so location is transparent.
Volume – Delta¶
databricks_bundle_decorators.io_managers.SparkUCVolumeDeltaIoManager(catalog, schema, volume, write_options=None, read_options=None, mode='error', *, auto_filter=True, retry=None)
¶
Bases: IoManager
Persist PySpark DataFrames as Delta tables in UC Volumes.
Writes to /Volumes/<catalog>/<schema>/<volume>/<task_key>
using the standard Delta format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
catalog
|
str
|
Unity Catalog catalog name. |
required |
schema
|
str
|
Unity Catalog schema (database) name. |
required |
volume
|
str
|
Unity Catalog volume name. |
required |
write_options
|
dict[str, str] | None
|
Extra Spark writer options applied via |
None
|
read_options
|
dict[str, str] | None
|
Extra Spark reader options applied via |
None
|
mode
|
str
|
Delta write mode ( |
'error'
|
retry
|
`RetryConfig` | None
|
Optional retry configuration for write operations. When set,
failed writes are retried with exponential backoff (powered by
|
None
|
Example
::
from databricks_bundle_decorators.io_managers import (
SparkUCVolumeDeltaIoManager,
)
io = SparkUCVolumeDeltaIoManager(
catalog="main",
schema="staging",
volume="raw_data",
)
@task(io_manager=io)
def extract():
spark = SparkSession.getActiveSession()
return spark.range(10)
Source code in src/databricks_bundle_decorators/io_managers/spark_uc.py
setup()
¶
Obtain the active SparkSession.
Source code in src/databricks_bundle_decorators/io_managers/spark_uc.py
write(context, obj)
¶
Write a PySpark DataFrame or DeltaMerge.
- If obj is a
DeltaMerge, builds and executes a merge. - Otherwise writes via
save()with the configuredmode,partition_by, andwrite_options.
When partition_by includes "backfill_key", the column
is injected automatically from the context.
Source code in src/databricks_bundle_decorators/io_managers/spark_uc.py
read(context)
¶
Read Delta from a UC Volume path as a PySpark DataFrame.
When partition_by includes "backfill_key", reads are
filtered to the current partition unless the upstream
dependency uses all_partitions() or the consuming
task uses @task(all_partitions=True).
Source code in src/databricks_bundle_decorators/io_managers/spark_uc.py
Volume – Parquet¶
databricks_bundle_decorators.io_managers.SparkUCVolumeParquetIoManager(catalog, schema, volume, write_options=None, read_options=None, *, auto_filter=True, retry=None)
¶
Bases: IoManager
Persist PySpark DataFrames as Parquet in UC Volumes.
Writes to /Volumes/<catalog>/<schema>/<volume>/<task_key>.parquet
using the Parquet format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
catalog
|
str
|
Unity Catalog catalog name. |
required |
schema
|
str
|
Unity Catalog schema (database) name. |
required |
volume
|
str
|
Unity Catalog volume name. |
required |
write_options
|
dict[str, str] | None
|
Extra Spark writer options applied via |
None
|
read_options
|
dict[str, str] | None
|
Extra Spark reader options applied via |
None
|
retry
|
`RetryConfig` | None
|
Optional retry configuration for write operations. When set,
failed writes are retried with exponential backoff (powered by
|
None
|
Example
::
from databricks_bundle_decorators.io_managers import (
SparkUCVolumeParquetIoManager,
)
io = SparkUCVolumeParquetIoManager(
catalog="main",
schema="staging",
volume="raw_data",
)
@task(io_manager=io)
def extract():
spark = SparkSession.getActiveSession()
return spark.range(10)
Source code in src/databricks_bundle_decorators/io_managers/spark_uc.py
setup()
¶
Obtain the active SparkSession.
Source code in src/databricks_bundle_decorators/io_managers/spark_uc.py
write(context, obj)
¶
Write a PySpark DataFrame as Parquet to a UC Volume path.
When partition_by includes "backfill_key", the column
is injected automatically from the context.
Source code in src/databricks_bundle_decorators/io_managers/spark_uc.py
read(context)
¶
Read Parquet from a UC Volume path as a PySpark DataFrame.
When partition_by includes "backfill_key", reads are
filtered to the current partition unless the upstream
dependency uses all_partitions() or the consuming
task uses @task(all_partitions=True).