Getting Started¶
1. Scaffold your pipeline project¶
uv init my-pipeline && cd my-pipeline
uv add databricks-bundle-decorators[azure] # or [aws], [gcp], [polars]
uv run dbxdec init
Docker deployment
If you pre-install your package in a custom Docker image instead of
deploying a wheel, use uv run dbxdec init --docker to generate a
Docker-ready example. See Docker Deployment
for details.
dbxdec init creates:
| File | Purpose |
|---|---|
resources/__init__.py |
Tells databricks bundle deploy about your jobs |
src/<package>/pipelines/__init__.py |
Auto-imports all pipeline files in the directory |
src/<package>/pipelines/example.py |
Starter pipeline with @task, @job, job_cluster() |
databricks.yaml |
Databricks Asset Bundle configuration |
pyproject.toml |
Updated with the pipeline package entry point |
2. Define your pipeline¶
# src/my_pipeline/pipelines/github_events.py
from databricks_bundle_decorators import job, job_cluster, params, task, set_task_value, for_each_task
from databricks_bundle_decorators.io_managers import SparkDeltaIoManager
staging_io = SparkDeltaIoManager(
base_path="abfss://datalake@mystorageaccount.dfs.core.windows.net/staging",
mode="overwrite",
)
small_cluster = job_cluster(
name="small_cluster",
spark_version="16.4.x-scala2.12",
node_type_id="Standard_E8ds_v4",
num_workers=1,
)
@job(
tags={"source": "github", "type": "api"},
schedule="0 * * * *",
params={"source_table": "raw.github_events"},
cluster=small_cluster,
)
def github_events():
@task(io_manager=staging_io)
def extract():
from pyspark.sql import SparkSession
spark = SparkSession.getActiveSession()
df = spark.table(params["source_table"])
set_task_value("row_count", df.count())
return df
@task
def transform(raw_df):
raw_df.show(10)
@for_each_task(inputs=["slack", "email", "pagerduty"], concurrency=3)
def notify(inputs: str, raw_df):
print(f"Sending alert for {inputs} ({raw_df.count()} rows)")
df = extract()
transform(df)
notify(raw_df=df)
Polars variant
import polars as pl
from databricks_bundle_decorators import job, job_cluster, params, task, set_task_value, for_each_task
from databricks_bundle_decorators.io_managers import PolarsParquetIoManager
staging_io = PolarsParquetIoManager(
base_path="abfss://datalake@mystorageaccount.dfs.core.windows.net/staging",
)
# ... same cluster and @job definition ...
def github_events():
@task(io_manager=staging_io)
def extract():
import requests
r = requests.get(params["url"])
df = pl.DataFrame(r.json())
set_task_value("row_count", len(df))
return df
@task
def transform(raw_df):
print(raw_df.head(10))
df = extract()
transform(df)
Important gotchas
The @job body runs at deploy time, not on a cluster — keep all
business logic inside @task functions. Also avoid reserved parameter
names (__job_name__, __task_key__, __run_id__, __upstream__*).
See How It Works — Limitations for the full list.
3. Deploy¶
Pipeline Discovery¶
dbxdec init sets up pipeline discovery automatically. If you need to
configure it manually (e.g. in an existing project), see
Discovery in the Internals section.
Incremental adoption¶
You don't have to migrate everything at once. The Databricks CLI merges
resources from all sources — YAML-defined jobs and decorator-defined
jobs coexist in the same bundle. Keep your existing
resources.jobs in databricks.yaml and add new jobs with decorators:
# databricks.yaml
resources:
jobs:
existing_job:
name: existing_job
tasks:
- task_key: ingest
spark_python_task:
python_file: src/etl/ingest.py
python:
venv_path: .venv
resources:
- 'resources:load_resources' # new decorator-defined jobs
Both sets of jobs deploy together. Migrate at your own pace.