Skip to content

Spark – Serverless Compute

Serverless compute does not support spark.conf.set(). The base_path must be a storage location registered as a Unity Catalog external location — serverless compute can only access paths governed by UC. Arbitrary cloud storage URIs that are not registered as external locations will fail at runtime.

Partitioning

Same behaviour as classic compute — partition_by is specified on the @task decorator and uses Spark's native partitionBy().
"backfill_key" is auto-injected on write and auto-filtered on read.

Delta

Merge / Upsert

mode="merge" is not a valid write mode and will raise a ValueError. To perform merge/upsert operations, return a DeltaMerge from your task function. See Delta Write Modes & Merge for full examples.

databricks_bundle_decorators.io_managers.SparkServerlessDeltaIoManager(base_path, write_options=None, read_options=None, mode='error', *, auto_filter=True, retry=None)

Bases: _SparkDeltaBase

Persist PySpark DataFrames as Delta tables on serverless compute.

Serverless compute does not support spark.conf.set() for credential injection. The base_path must be a storage location registered as a Unity Catalog external location — serverless compute can only access paths governed by UC. Arbitrary cloud storage URIs that are not registered as external locations will fail at runtime.

Parameters:

Name Type Description Default
base_path str | Callable[[], str]

Root URI for Delta tables. Must be a path governed by a Unity Catalog external location (e.g. abfss://container@account.dfs.core.windows.net/staging).

Can also be a callable that returns a string, resolved lazily at runtime.

required
write_options dict[str, str] | None

Extra Spark writer options applied via .option(k, v).

None
read_options dict[str, str] | None

Extra Spark reader options applied via .option(k, v).

None
mode str

Delta write mode ("error", "overwrite", "append", etc.). Defaults to "error" to prevent accidental data loss. For merge operations, return a DeltaMerge from your task instead.

'error'
retry `RetryConfig` | None

Optional retry configuration for write operations. When set, failed writes are retried with exponential backoff (powered by tenacity). Useful for handling transient Delta commit conflicts during concurrent backfill runs on unpartitioned tables. Defaults to None (no retries).

None
Example

::

from databricks_bundle_decorators.io_managers import (
    SparkServerlessDeltaIoManager,
)

io = SparkServerlessDeltaIoManager(
    base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
)


@task(io_manager=io)
def extract():
    spark = SparkSession.getActiveSession()
    return spark.range(10)
Source code in src/databricks_bundle_decorators/io_managers/spark_delta.py
def __init__(
    self,
    base_path: str | Callable[[], str],
    write_options: dict[str, str] | None = None,
    read_options: dict[str, str] | None = None,
    mode: str = "error",
    *,
    auto_filter: bool = True,
    retry: RetryConfig | None = None,
) -> None:
    _validate_delta_mode(mode, type(self).__name__)
    self._base_path = base_path
    self._write_options = write_options or {}
    self._read_options = read_options or {}
    self._mode = mode
    self.auto_filter = auto_filter
    self.retry = retry

setup()

Obtain the active SparkSession (no config injection).

Source code in src/databricks_bundle_decorators/io_managers/spark_delta.py
def setup(self) -> None:
    """Obtain the active SparkSession (no config injection)."""
    from pyspark.sql import SparkSession  # noqa: PLC0415

    self._spark = SparkSession.getActiveSession()
    if self._spark is None:
        msg = "No active SparkSession found."
        raise RuntimeError(msg)

Parquet

databricks_bundle_decorators.io_managers.SparkServerlessParquetIoManager(base_path, write_options=None, read_options=None, *, auto_filter=True, retry=None)

Bases: _SparkParquetBase

Persist PySpark DataFrames as Parquet on serverless compute.

Serverless compute does not support spark.conf.set() for credential injection. The base_path must be a storage location registered as a Unity Catalog external location — serverless compute can only access paths governed by UC. Arbitrary cloud storage URIs that are not registered as external locations will fail at runtime.

Parameters:

Name Type Description Default
base_path str | Callable[[], str]

Root URI for Parquet files. Must be a path governed by a Unity Catalog external location (e.g. abfss://container@account.dfs.core.windows.net/staging).

Can also be a callable that returns a string, resolved lazily at runtime.

required
write_options dict[str, str] | None

Extra Spark writer options applied via .option(k, v).

None
read_options dict[str, str] | None

Extra Spark reader options applied via .option(k, v).

None
retry `RetryConfig` | None

Optional retry configuration for write operations. When set, failed writes are retried with exponential backoff (powered by tenacity). Useful for handling transient write conflicts during concurrent backfill runs. Defaults to None (no retries).

None
Example

::

from databricks_bundle_decorators.io_managers import (
    SparkServerlessParquetIoManager,
)

io = SparkServerlessParquetIoManager(
    base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
)


@task(io_manager=io)
def extract():
    spark = SparkSession.getActiveSession()
    return spark.range(10)
Source code in src/databricks_bundle_decorators/io_managers/spark_parquet.py
def __init__(
    self,
    base_path: str | Callable[[], str],
    write_options: dict[str, str] | None = None,
    read_options: dict[str, str] | None = None,
    *,
    auto_filter: bool = True,
    retry: RetryConfig | None = None,
) -> None:
    self._base_path = base_path
    self._write_options = write_options or {}
    self._read_options = read_options or {}
    self.auto_filter = auto_filter
    self.retry = retry

setup()

Obtain the active SparkSession (no config injection).

Source code in src/databricks_bundle_decorators/io_managers/spark_parquet.py
def setup(self) -> None:
    """Obtain the active SparkSession (no config injection)."""
    from pyspark.sql import SparkSession  # noqa: PLC0415

    self._spark = SparkSession.getActiveSession()
    if self._spark is None:
        msg = "No active SparkSession found."
        raise RuntimeError(msg)