Skip to content

Spark – Serverless Compute

Serverless compute does not support spark.conf.set(). The base_path must be a storage location registered as a Unity Catalog external location — serverless compute can only access paths governed by UC. Arbitrary cloud storage URIs that are not registered as external locations will fail at runtime.

Partitioning

Same behaviour as classic compute — partition_by is specified on the @task decorator and uses Spark's native partitionBy().
"backfill_key" is auto-injected on write and auto-filtered on read.

Delta

databricks_bundle_decorators.io_managers.SparkServerlessDeltaIoManager(base_path, write_options=None, read_options=None, mode='error', *, auto_filter=True)

Bases: _SparkDeltaBase

Persist PySpark DataFrames as Delta tables on serverless compute.

Serverless compute does not support spark.conf.set() for credential injection. The base_path must be a storage location registered as a Unity Catalog external location — serverless compute can only access paths governed by UC. Arbitrary cloud storage URIs that are not registered as external locations will fail at runtime.

Parameters:

Name Type Description Default
base_path str | Callable[[], str]

Root URI for Delta tables. Must be a path governed by a Unity Catalog external location (e.g. abfss://container@account.dfs.core.windows.net/staging).

Can also be a callable that returns a string, resolved lazily at runtime.

required
write_options dict[str, str] | None

Extra Spark writer options applied via .option(k, v).

None
read_options dict[str, str] | None

Extra Spark reader options applied via .option(k, v).

None
mode str

Delta write mode ("error", "overwrite", "append", etc.). Defaults to "error" to prevent accidental data loss. For merge operations, return a DeltaMergeBuilder from your task instead.

'error'
Example

::

from databricks_bundle_decorators.io_managers import (
    SparkServerlessDeltaIoManager,
)

io = SparkServerlessDeltaIoManager(
    base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
)

@task(io_manager=io)
def extract():
    spark = SparkSession.getActiveSession()
    return spark.range(10)
Source code in src/databricks_bundle_decorators/io_managers/spark_delta.py
def __init__(
    self,
    base_path: str | Callable[[], str],
    write_options: dict[str, str] | None = None,
    read_options: dict[str, str] | None = None,
    mode: str = "error",
    *,
    auto_filter: bool = True,
) -> None:
    self._base_path = base_path
    self._write_options = write_options or {}
    self._read_options = read_options or {}
    self._mode = mode
    self.auto_filter = auto_filter

setup()

Obtain the active SparkSession (no config injection).

Source code in src/databricks_bundle_decorators/io_managers/spark_delta.py
def setup(self) -> None:
    """Obtain the active SparkSession (no config injection)."""
    from pyspark.sql import SparkSession

    self._spark = SparkSession.getActiveSession()
    if self._spark is None:
        msg = "No active SparkSession found."
        raise RuntimeError(msg)

Parquet

databricks_bundle_decorators.io_managers.SparkServerlessParquetIoManager(base_path, write_options=None, read_options=None, *, auto_filter=True)

Bases: _SparkParquetBase

Persist PySpark DataFrames as Parquet on serverless compute.

Serverless compute does not support spark.conf.set() for credential injection. The base_path must be a storage location registered as a Unity Catalog external location — serverless compute can only access paths governed by UC. Arbitrary cloud storage URIs that are not registered as external locations will fail at runtime.

Parameters:

Name Type Description Default
base_path str | Callable[[], str]

Root URI for Parquet files. Must be a path governed by a Unity Catalog external location (e.g. abfss://container@account.dfs.core.windows.net/staging).

Can also be a callable that returns a string, resolved lazily at runtime.

required
write_options dict[str, str] | None

Extra Spark writer options applied via .option(k, v).

None
read_options dict[str, str] | None

Extra Spark reader options applied via .option(k, v).

None
Example

::

from databricks_bundle_decorators.io_managers import (
    SparkServerlessParquetIoManager,
)

io = SparkServerlessParquetIoManager(
    base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
)

@task(io_manager=io)
def extract():
    spark = SparkSession.getActiveSession()
    return spark.range(10)
Source code in src/databricks_bundle_decorators/io_managers/spark_parquet.py
def __init__(
    self,
    base_path: str | Callable[[], str],
    write_options: dict[str, str] | None = None,
    read_options: dict[str, str] | None = None,
    *,
    auto_filter: bool = True,
) -> None:
    self._base_path = base_path
    self._write_options = write_options or {}
    self._read_options = read_options or {}
    self.auto_filter = auto_filter

setup()

Obtain the active SparkSession (no config injection).

Source code in src/databricks_bundle_decorators/io_managers/spark_parquet.py
def setup(self) -> None:
    """Obtain the active SparkSession (no config injection)."""
    from pyspark.sql import SparkSession

    self._spark = SparkSession.getActiveSession()
    if self._spark is None:
        msg = "No active SparkSession found."
        raise RuntimeError(msg)