databricks-bundle-decorators¶
Decorator-based framework for defining Databricks jobs and tasks as Python code. Define pipelines using @task, @job, and job_cluster() — they compile into Databricks Asset Bundle resources.
Why databricks-bundle-decorators?¶
Writing Databricks jobs in raw YAML is tedious and disconnects task logic from orchestration configuration. databricks-bundle-decorators lets you express both in Python:
- Airflow TaskFlow-inspired pattern — define
@taskfunctions inside a@jobbody; dependencies are captured automatically from call arguments. - IoManager pattern — large data (DataFrames, datasets) flows between tasks through external storage automatically.
- Control-flow dependencies — use
@task(depends_on=...)for ordering constraints without data transfer. - Explicit task values — small scalars (
str,int,float,bool) can be passed between tasks viaset_task_value/get_task_value, like Airflow XComs. - Pure Python — write your jobs and tasks as decorated functions, run
databricks bundle deploy, and the framework generates all Databricks Job configurations for you.
Installation¶
With cloud-specific extras:
Quick Example¶
from databricks_bundle_decorators import job, job_cluster, params, task
from databricks_bundle_decorators.io_managers import SparkDeltaIoManager
io = SparkDeltaIoManager(
base_path="abfss://lake@account.dfs.core.windows.net/staging",
mode="overwrite",
)
small_cluster = job_cluster(name="small", spark_version="16.4.x-scala2.12", node_type_id="Standard_E8ds_v4", num_workers=1)
@job(schedule="0 * * * *", cluster=small_cluster)
def my_pipeline():
@task(io_manager=io)
def extract():
from pyspark.sql import SparkSession
spark = SparkSession.getActiveSession()
return spark.table("raw.events").limit(100)
@task
def transform(df):
print(f"Rows: {df.count()}")
data = extract()
transform(data)
Polars variant
from databricks_bundle_decorators import job, job_cluster, task
from databricks_bundle_decorators.io_managers import PolarsParquetIoManager
io = PolarsParquetIoManager(base_path="abfss://lake@account.dfs.core.windows.net/staging")
small_cluster = job_cluster(name="small", spark_version="16.4.x-scala2.12", node_type_id="Standard_E8ds_v4", num_workers=1)
@job(schedule="0 * * * *", cluster=small_cluster)
def my_pipeline():
@task(io_manager=io)
def extract():
import polars as pl
return pl.DataFrame({"x": [1, 2, 3]})
@task
def transform(df):
print(df.head())
data = extract()
transform(data)