Spark – Classic Compute¶
Classic compute IoManagers support credential injection via
spark.conf.set() using the spark_configs parameter. This follows
the same dict-or-callable pattern as the Polars storage_options.
Partitioning¶
Both Delta and Parquet IoManagers support partition_by via the
@task decorator. When partition_by includes "backfill_key",
the column is auto-injected via F.lit() before writing, and
auto-filtered on read. Partitioning uses Spark's native
partitionBy().
io = SparkDeltaIoManager(
base_path="abfss://lake@acct.dfs.core.windows.net/data",
spark_configs=_configs,
mode="overwrite",
)
@task(io_manager=io, partition_by=["backfill_key", "region"])
def extract(): ...
Delta¶
databricks_bundle_decorators.io_managers.SparkDeltaIoManager(base_path, spark_configs=None, write_options=None, read_options=None, mode='error', *, auto_filter=True)
¶
Bases: _SparkDeltaBase
Persist PySpark DataFrames as Delta tables on classic compute.
Credentials are injected into the Spark session via
spark.conf.set() during setup, following the same
dict-or-callable pattern as the Polars IoManagers'
storage_options.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_path
|
str | Callable[[], str]
|
Root URI for Delta tables. Each task creates a sub-directory
named after its task key (e.g.
Can also be a callable that returns a string, resolved lazily at runtime. Use this for multi-environment deployments where the path depends on job parameters:: |
required |
spark_configs
|
dict[str, str] | Callable[[], dict[str, str]] | None
|
Key-value pairs applied via Use a callable to defer secret lookup to runtime:: |
None
|
write_options
|
dict[str, str] | None
|
Extra Spark writer options applied via |
None
|
read_options
|
dict[str, str] | None
|
Extra Spark reader options applied via |
None
|
mode
|
str
|
Delta write mode. One of For merge operations, ignore this parameter and return a
fully-configured |
'error'
|
Example
::
from databricks_bundle_decorators.io_managers import SparkDeltaIoManager
io = SparkDeltaIoManager(
base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
spark_configs={
"fs.azure.account.key.myaccount.dfs.core.windows.net": "***",
},
)
@task(io_manager=io)
def extract():
spark = SparkSession.getActiveSession()
return spark.range(10)
Merge example::
@task(io_manager=io)
def upsert(new_data):
from delta.tables import DeltaTable
spark = SparkSession.getActiveSession()
dt = DeltaTable.forPath(spark, io._uri("upsert"))
return (
dt.alias("t")
.merge(new_data.alias("s"), "t.id = s.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
)
Source code in src/databricks_bundle_decorators/io_managers/spark_delta.py
spark_configs
property
¶
Resolve spark_configs, calling it first if it is a callable.
setup()
¶
Obtain the active SparkSession and apply spark_configs.
Source code in src/databricks_bundle_decorators/io_managers/spark_delta.py
Parquet¶
databricks_bundle_decorators.io_managers.SparkParquetIoManager(base_path, spark_configs=None, write_options=None, read_options=None, *, auto_filter=True)
¶
Bases: _SparkParquetBase
Persist PySpark DataFrames as Parquet on classic compute.
Credentials are injected into the Spark session via
spark.conf.set() during setup, following the same
dict-or-callable pattern as the Polars IoManagers'
storage_options.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_path
|
str | Callable[[], str]
|
Root URI for Parquet files. Each task creates a sub-directory
named after its task key (e.g.
Can also be a callable that returns a string, resolved lazily at runtime. Use this for multi-environment deployments where the path depends on job parameters:: |
required |
spark_configs
|
dict[str, str] | Callable[[], dict[str, str]] | None
|
Key-value pairs applied via Use a callable to defer secret lookup to runtime:: |
None
|
write_options
|
dict[str, str] | None
|
Extra Spark writer options applied via |
None
|
read_options
|
dict[str, str] | None
|
Extra Spark reader options applied via |
None
|
Example
::
from databricks_bundle_decorators.io_managers import SparkParquetIoManager
io = SparkParquetIoManager(
base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
spark_configs={
"fs.azure.account.key.myaccount.dfs.core.windows.net": "***",
},
)
@task(io_manager=io)
def extract():
spark = SparkSession.getActiveSession()
return spark.range(10)
Source code in src/databricks_bundle_decorators/io_managers/spark_parquet.py
spark_configs
property
¶
Resolve spark_configs, calling it first if it is a callable.
setup()
¶
Obtain the active SparkSession and apply spark_configs.