Skip to content

Spark – Classic Compute

Classic compute IoManagers support credential injection via spark.conf.set() using the spark_configs parameter. This follows the same dict-or-callable pattern as the Polars storage_options.

Partitioning

Both Delta and Parquet IoManagers support partition_by via the @task decorator. When partition_by includes "backfill_key", the column is auto-injected via F.lit() before writing, and auto-filtered on read. Partitioning uses Spark's native partitionBy().

io = SparkDeltaIoManager(
    base_path="abfss://lake@acct.dfs.core.windows.net/data",
    spark_configs=_configs,
    mode="overwrite",
)

@task(io_manager=io, partition_by=["backfill_key", "region"])
def extract(): ...

Delta

databricks_bundle_decorators.io_managers.SparkDeltaIoManager(base_path, spark_configs=None, write_options=None, read_options=None, mode='error', *, auto_filter=True)

Bases: _SparkDeltaBase

Persist PySpark DataFrames as Delta tables on classic compute.

Credentials are injected into the Spark session via spark.conf.set() during setup, following the same dict-or-callable pattern as the Polars IoManagers' storage_options.

Parameters:

Name Type Description Default
base_path str | Callable[[], str]

Root URI for Delta tables. Each task creates a sub-directory named after its task key (e.g. abfss://container@account.dfs.core.windows.net/staging).

Can also be a callable that returns a string, resolved lazily at runtime. Use this for multi-environment deployments where the path depends on job parameters::

from databricks_bundle_decorators import params

io = SparkDeltaIoManager(
    base_path=lambda: f"abfss://lake@{params['env']}account.dfs.core.windows.net/data",
)
required
spark_configs dict[str, str] | Callable[[], dict[str, str]] | None

Key-value pairs applied via spark.conf.set() before the first read or write. Can be a plain dict, a callable that returns a dict (resolved lazily at runtime), or None.

Use a callable to defer secret lookup to runtime::

from databricks_bundle_decorators import get_dbutils

def _configs() -> dict[str, str]:
    dbutils = get_dbutils()
    key = dbutils.secrets.get(scope="kv", key="storage-key")
    return {
        "fs.azure.account.key.myaccount.dfs.core.windows.net": key,
    }

io = SparkDeltaIoManager(
    base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
    spark_configs=_configs,
)
None
write_options dict[str, str] | None

Extra Spark writer options applied via .option(k, v).

None
read_options dict[str, str] | None

Extra Spark reader options applied via .option(k, v).

None
mode str

Delta write mode. One of "error" (default), "overwrite", "append", or "ignore". Defaults to "error" to prevent accidental data loss.

For merge operations, ignore this parameter and return a fully-configured DeltaMergeBuilder from your task instead. The IoManager will call .execute() on it automatically.

'error'
Example

::

from databricks_bundle_decorators.io_managers import SparkDeltaIoManager

io = SparkDeltaIoManager(
    base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
    spark_configs={
        "fs.azure.account.key.myaccount.dfs.core.windows.net": "***",
    },
)

@task(io_manager=io)
def extract():
    spark = SparkSession.getActiveSession()
    return spark.range(10)

Merge example::

    @task(io_manager=io)
    def upsert(new_data):
        from delta.tables import DeltaTable
        spark = SparkSession.getActiveSession()
        dt = DeltaTable.forPath(spark, io._uri("upsert"))
        return (
            dt.alias("t")
            .merge(new_data.alias("s"), "t.id = s.id")
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
        )
Source code in src/databricks_bundle_decorators/io_managers/spark_delta.py
def __init__(
    self,
    base_path: str | Callable[[], str],
    spark_configs: dict[str, str] | Callable[[], dict[str, str]] | None = None,
    write_options: dict[str, str] | None = None,
    read_options: dict[str, str] | None = None,
    mode: str = "error",
    *,
    auto_filter: bool = True,
) -> None:
    super().__init__(
        base_path,
        write_options=write_options,
        read_options=read_options,
        mode=mode,
        auto_filter=auto_filter,
    )
    self._spark_configs = spark_configs

spark_configs property

Resolve spark_configs, calling it first if it is a callable.

setup()

Obtain the active SparkSession and apply spark_configs.

Source code in src/databricks_bundle_decorators/io_managers/spark_delta.py
def setup(self) -> None:
    """Obtain the active SparkSession and apply ``spark_configs``."""
    from pyspark.sql import SparkSession

    self._spark = SparkSession.getActiveSession()
    if self._spark is None:
        msg = "No active SparkSession found."
        raise RuntimeError(msg)

    configs = self.spark_configs
    if configs:
        for key, value in configs.items():
            self._spark.conf.set(key, value)

Parquet

databricks_bundle_decorators.io_managers.SparkParquetIoManager(base_path, spark_configs=None, write_options=None, read_options=None, *, auto_filter=True)

Bases: _SparkParquetBase

Persist PySpark DataFrames as Parquet on classic compute.

Credentials are injected into the Spark session via spark.conf.set() during setup, following the same dict-or-callable pattern as the Polars IoManagers' storage_options.

Parameters:

Name Type Description Default
base_path str | Callable[[], str]

Root URI for Parquet files. Each task creates a sub-directory named after its task key (e.g. abfss://container@account.dfs.core.windows.net/staging).

Can also be a callable that returns a string, resolved lazily at runtime. Use this for multi-environment deployments where the path depends on job parameters::

from databricks_bundle_decorators import params

io = SparkParquetIoManager(
    base_path=lambda: f"abfss://lake@{params['env']}account.dfs.core.windows.net/data",
)
required
spark_configs dict[str, str] | Callable[[], dict[str, str]] | None

Key-value pairs applied via spark.conf.set() before the first read or write. Can be a plain dict, a callable that returns a dict (resolved lazily at runtime), or None.

Use a callable to defer secret lookup to runtime::

from databricks_bundle_decorators import get_dbutils

def _configs() -> dict[str, str]:
    dbutils = get_dbutils()
    key = dbutils.secrets.get(scope="kv", key="storage-key")
    return {
        "fs.azure.account.key.myaccount.dfs.core.windows.net": key,
    }

io = SparkParquetIoManager(
    base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
    spark_configs=_configs,
)
None
write_options dict[str, str] | None

Extra Spark writer options applied via .option(k, v).

None
read_options dict[str, str] | None

Extra Spark reader options applied via .option(k, v).

None
Example

::

from databricks_bundle_decorators.io_managers import SparkParquetIoManager

io = SparkParquetIoManager(
    base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
    spark_configs={
        "fs.azure.account.key.myaccount.dfs.core.windows.net": "***",
    },
)

@task(io_manager=io)
def extract():
    spark = SparkSession.getActiveSession()
    return spark.range(10)
Source code in src/databricks_bundle_decorators/io_managers/spark_parquet.py
def __init__(
    self,
    base_path: str | Callable[[], str],
    spark_configs: dict[str, str] | Callable[[], dict[str, str]] | None = None,
    write_options: dict[str, str] | None = None,
    read_options: dict[str, str] | None = None,
    *,
    auto_filter: bool = True,
) -> None:
    super().__init__(
        base_path,
        write_options=write_options,
        read_options=read_options,
        auto_filter=auto_filter,
    )
    self._spark_configs = spark_configs

spark_configs property

Resolve spark_configs, calling it first if it is a callable.

setup()

Obtain the active SparkSession and apply spark_configs.

Source code in src/databricks_bundle_decorators/io_managers/spark_parquet.py
def setup(self) -> None:
    """Obtain the active SparkSession and apply ``spark_configs``."""
    from pyspark.sql import SparkSession

    self._spark = SparkSession.getActiveSession()
    if self._spark is None:
        msg = "No active SparkSession found."
        raise RuntimeError(msg)

    configs = self.spark_configs
    if configs:
        for key, value in configs.items():
            self._spark.conf.set(key, value)