How It Works¶
Deploy and run¶
- You write Python — define
@taskfunctions inside a@jobbody, wire them by passing return values as arguments. databricks bundle deploy— the framework imports your pipeline modules and generates Databricks Job definitions. The@jobbody runs at import time to build the DAG, but@taskbusiness logic does not run at this stage.- Databricks runs your job — each task executes on a cluster. The framework loads upstream data, calls your function, and persists the result for downstream tasks.
You write Python
@job / @task / job_cluster()
▼
databricks bundle deploy
→ Job definitions created in workspace
▼
Job runs on Databricks
→ Each task: load upstream data → call your function → save output
Task dependencies¶
Inside a @job body, calling a @task function doesn't execute it immediately — it records it. Passing the return value of one task call to another captures the dependency:
@job
def my_job():
@task
def a(): ...
@task
def b(data): ...
x = a() # records task "a"
b(x) # records task "b", depends on "a"
At deploy time this produces a two-task job where b runs after a. At runtime, the framework passes the output of a as the data argument to b.
Passing data between tasks¶
There are two mechanisms, suited to different data sizes:
| Mechanism | Use case | Notes |
|---|---|---|
IoManager |
DataFrames, datasets, large objects | Unlimited size - external storage) |
set_task_value / get_task_value |
Row counts, status flags, iteration lists, small JSON data | JSON-serializable small values |
IoManager (large data)¶
Attach an IoManager to a task to persist its return value to external storage. Downstream tasks receive the data as a plain function argument:
from databricks_bundle_decorators.io_managers import SparkDeltaIoManager
io = SparkDeltaIoManager(
base_path="abfss://lake@account.dfs.core.windows.net/staging",
mode="overwrite",
)
@job(cluster=my_cluster)
def pipeline():
@task(io_manager=io)
def extract():
from pyspark.sql import SparkSession
spark = SparkSession.getActiveSession()
return spark.table("raw.events").limit(100)
@task
def transform(df):
print(f"Rows: {df.count()}")
data = extract() # output saved by IoManager
transform(data) # input loaded by upstream's IoManager
Task values (small scalars)¶
For lightweight metadata (row counts, status flags), use task values:
from databricks_bundle_decorators import set_task_value, get_task_value
@task
def produce():
set_task_value("row_count", 42)
@task
def consume():
count = get_task_value("produce", "row_count")
Delta Write Modes & Merge¶
All Delta IoManagers accept a mode parameter ("error", "overwrite",
"append", "ignore"). For merge / upsert operations, return a
merge builder from your task instead of a DataFrame — the IoManager
detects the type and calls .execute() automatically.
See Built-in IoManagers for full details, examples, and the mode reference table.
Packaging model¶
┌──────────────────────────────┐ ┌────────────────────────┐
│ databricks-bundle-decorators│ │ my-pipeline (repo) │
│ (library, PyPI) │◄────│ │
│ │ │ pyproject.toml │
│ @task, @job, job_cluster() │ │ src/my_pipeline/ │
│ IoManager ABC │ │ pipelines/ │
│ dbxdec CLI │ │ resources/__init__.py │
│ │ │ databricks.yaml │
└──────────────────────────────┘ └────────────────────────┘
Limitations¶
- The
@jobbody is for wiring only. It runs once at import time (duringdatabricks bundle deploy), not on a cluster. Keep all business logic inside@taskfunctions. - No conditional or dynamic DAGs.
if/elseor loops in the@jobbody are evaluated at import time. Put conditional logic inside a@taskfunction. - Task arguments are symbolic.
@taskcalls returnTaskProxyplaceholders, not real data. Passing a literal value to a task call has no effect at runtime. - Dependencies must be direct arguments. A
TaskProxyhidden inside a list, dict, or other container will not register a dependency edge — use a separate parameter per upstream dependency. - IoManager belongs to the producer. Attach
io_manager=to the task that produces data. Downstream tasks receive data as plain function arguments. - Names must be unique. Job names are unique across the project; task names are unique within a job. Duplicates raise
DuplicateResourceErrorat import time.
Examples of common mistakes
Nested proxies — edge NOT captured:
@job
def my_job():
@task
def a(): ...
@task
def b(inputs): ...
result = a()
b(inputs=[result]) # ✗ — result is nested in a list
Correct — direct keyword argument:
@job
def my_job():
@task
def a(): ...
@task
def b(a_data): ...
result = a()
b(a_data=result) # ✓ — edge captured
Side effects in @job body — runs at deploy time:
Under the hood
For details on codegen, runtime dispatch, the registry, and pipeline discovery, see Internals.