For-Each¶
Fan-out processing with @for_each_task — execute a task once per element in a list.
Dynamic inputs (from upstream task value)¶
from databricks_bundle_decorators import job, job_cluster, task, task_value, for_each_task, set_task_value
from databricks_bundle_decorators.io_managers import SparkDeltaIoManager
io = SparkDeltaIoManager(base_path="abfss://lake@account.dfs.core.windows.net/staging")
cluster = job_cluster(name="cluster", spark_version="16.4.x-scala2.12", node_type_id="Standard_E8ds_v4", num_workers=2)
@job(cluster=cluster)
def fan_out_job():
@task
def discover_regions():
set_task_value("regions", ["us-east-1", "eu-west-1", "ap-southeast-1"])
@task(io_manager=io)
def load_data():
from pyspark.sql import SparkSession
spark = SparkSession.getActiveSession()
return spark.table("raw.events")
@for_each_task(inputs=task_value(discover_regions, "regions"), concurrency=3)
def process(inputs: str, events):
filtered = events.filter(events.region == inputs)
print(f"Processing {inputs}: {filtered.count()} rows")
data = load_data()
process(events=data)
How this works:
discover_regionspublishes the iteration list viaset_task_value("regions", ...). The key can be any string — it must match the second argument totask_value().task_value(discover_regions, "regions")creates a reference to the"regions"task-value on the upstream task. At deploy time this generates{{tasks.discover_regions.values.regions}}.events=datais a regular IoManager data dependency — each iteration receives the full DataFrame fromload_data. This is wired by calling the for-each function inside the@jobbody, the same way as@task.
Static inputs¶
@job(cluster=cluster)
def static_fan_out():
@task(io_manager=io)
def build_report():
from pyspark.sql import SparkSession
spark = SparkSession.getActiveSession()
return spark.createDataFrame([("metric", 100)], ["name", "value"])
@for_each_task(inputs=["slack", "email", "pagerduty"], concurrency=3)
def notify(inputs: str, report):
print(f"[{inputs}] Sending report with {report.count()} rows")
r = build_report()
notify(report=r)
Static inputs are passed as a plain Python list (must be JSON-serialisable) to the decorator. Other parameters like report are regular IoManager data dependencies wired via the function call.