Spark – Unity Catalog¶
Unity Catalog IoManagers work on both classic and serverless compute. UC handles authentication and access control, so no credential injection is needed.
Partitioning¶
All UC IoManagers support partition_by via the @task decorator.
"backfill_key" is auto-injected on write and auto-filtered on read.
Managed tables use partitionBy() with saveAsTable(); volume paths
use partitionBy() with save().
io = SparkUCTableIoManager(
catalog="main",
schema="staging",
mode="overwrite",
)
@task(io_manager=io, partition_by="backfill_key")
def extract(): ...
Managed Tables¶
databricks_bundle_decorators.io_managers.SparkUCTableIoManager(catalog, schema, write_options=None, read_options=None, mode='error', *, auto_filter=True)
¶
Bases: IoManager
Persist PySpark DataFrames as Unity Catalog managed Delta tables.
Uses saveAsTable / spark.table() with the three-level
namespace catalog.schema.task_key.
Unity Catalog manages access control and storage location, so no credential configuration is required. Works on both classic and serverless compute.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
catalog
|
str
|
Unity Catalog catalog name (e.g. |
required |
schema
|
str
|
Unity Catalog schema (database) name (e.g. |
required |
write_options
|
dict[str, str] | None
|
Extra Spark writer options applied via |
None
|
read_options
|
dict[str, str] | None
|
Extra Spark reader options applied via |
None
|
mode
|
str
|
Delta write mode ( |
'error'
|
Example
::
from databricks_bundle_decorators.io_managers import SparkUCTableIoManager
io = SparkUCTableIoManager(catalog="main", schema="staging")
@task(io_manager=io)
def extract():
spark = SparkSession.getActiveSession()
return spark.range(10)
@task
def transform(df): # spark.table("main.staging.extract")
df.show()
Source code in src/databricks_bundle_decorators/io_managers/spark_uc.py
setup()
¶
Obtain the active SparkSession.
Source code in src/databricks_bundle_decorators/io_managers/spark_uc.py
write(context, obj)
¶
Write a PySpark DataFrame or execute a DeltaMergeBuilder.
- If obj is a
DeltaMergeBuilder, calls.execute(). - Otherwise writes via
saveAsTablewith the configuredmode,partition_by, andwrite_options.
When partition_by includes "backfill_key", the column
is injected automatically from the context.
Source code in src/databricks_bundle_decorators/io_managers/spark_uc.py
read(context)
¶
Read a Unity Catalog managed table as a PySpark DataFrame.
When partition_by includes "backfill_key", reads are
filtered to the current partition unless the upstream
dependency uses all_partitions() or the consuming
task uses @task(all_partitions=True).
Source code in src/databricks_bundle_decorators/io_managers/spark_uc.py
Volume – Delta¶
databricks_bundle_decorators.io_managers.SparkUCVolumeDeltaIoManager(catalog, schema, volume, write_options=None, read_options=None, mode='error', *, auto_filter=True)
¶
Bases: IoManager
Persist PySpark DataFrames as Delta tables in UC Volumes.
Writes to /Volumes/<catalog>/<schema>/<volume>/<task_key>
using the standard Delta format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
catalog
|
str
|
Unity Catalog catalog name. |
required |
schema
|
str
|
Unity Catalog schema (database) name. |
required |
volume
|
str
|
Unity Catalog volume name. |
required |
write_options
|
dict[str, str] | None
|
Extra Spark writer options applied via |
None
|
read_options
|
dict[str, str] | None
|
Extra Spark reader options applied via |
None
|
mode
|
str
|
Delta write mode ( |
'error'
|
Example
::
from databricks_bundle_decorators.io_managers import (
SparkUCVolumeDeltaIoManager,
)
io = SparkUCVolumeDeltaIoManager(
catalog="main", schema="staging", volume="raw_data",
)
@task(io_manager=io)
def extract():
spark = SparkSession.getActiveSession()
return spark.range(10)
Source code in src/databricks_bundle_decorators/io_managers/spark_uc.py
setup()
¶
Obtain the active SparkSession.
Source code in src/databricks_bundle_decorators/io_managers/spark_uc.py
write(context, obj)
¶
Write a PySpark DataFrame or execute a DeltaMergeBuilder.
- If obj is a
DeltaMergeBuilder, calls.execute(). - Otherwise writes via
save()with the configuredmode,partition_by, andwrite_options.
When partition_by includes "backfill_key", the column
is injected automatically from the context.
Source code in src/databricks_bundle_decorators/io_managers/spark_uc.py
read(context)
¶
Read Delta from a UC Volume path as a PySpark DataFrame.
When partition_by includes "backfill_key", reads are
filtered to the current partition unless the upstream
dependency uses all_partitions() or the consuming
task uses @task(all_partitions=True).
Source code in src/databricks_bundle_decorators/io_managers/spark_uc.py
Volume – Parquet¶
databricks_bundle_decorators.io_managers.SparkUCVolumeParquetIoManager(catalog, schema, volume, write_options=None, read_options=None, *, auto_filter=True)
¶
Bases: IoManager
Persist PySpark DataFrames as Parquet in UC Volumes.
Writes to /Volumes/<catalog>/<schema>/<volume>/<task_key>.parquet
using the Parquet format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
catalog
|
str
|
Unity Catalog catalog name. |
required |
schema
|
str
|
Unity Catalog schema (database) name. |
required |
volume
|
str
|
Unity Catalog volume name. |
required |
write_options
|
dict[str, str] | None
|
Extra Spark writer options applied via |
None
|
read_options
|
dict[str, str] | None
|
Extra Spark reader options applied via |
None
|
Example
::
from databricks_bundle_decorators.io_managers import (
SparkUCVolumeParquetIoManager,
)
io = SparkUCVolumeParquetIoManager(
catalog="main", schema="staging", volume="raw_data",
)
@task(io_manager=io)
def extract():
spark = SparkSession.getActiveSession()
return spark.range(10)
Source code in src/databricks_bundle_decorators/io_managers/spark_uc.py
setup()
¶
Obtain the active SparkSession.
Source code in src/databricks_bundle_decorators/io_managers/spark_uc.py
write(context, obj)
¶
Write a PySpark DataFrame as Parquet to a UC Volume path.
When partition_by includes "backfill_key", the column
is injected automatically from the context.
Source code in src/databricks_bundle_decorators/io_managers/spark_uc.py
read(context)
¶
Read Parquet from a UC Volume path as a PySpark DataFrame.
When partition_by includes "backfill_key", reads are
filtered to the current partition unless the upstream
dependency uses all_partitions() or the consuming
task uses @task(all_partitions=True).