Skip to content

Jobs

databricks_bundle_decorators.decorators.job(fn=None, *, params=None, cluster=None, libraries=None, backfill=None, **kwargs)

job(
    *,
    params: dict[str, str] | None = ...,
    cluster: ClusterMeta | None = ...,
    libraries: list | None = ...,
    backfill: BackfillDef | None = ...,
    **kwargs: Unpack[JobConfig],
) -> _JobDecorator
job(fn: types.FunctionType) -> Callable[..., Any]

Register a function as a Databricks job.

The function body is executed once at deploy time (when databricks bundle deploy imports your module — not at Databricks runtime). Inside the body, @task-decorated functions are defined and called. Each call returns a TaskProxy; passing a proxy to another task call records the dependency edge.

Parameters:

Name Type Description Default
params dict[str, str] | None

Default values for job-level parameters. Accessible inside task functions via from databricks_bundle_decorators import params.

None
cluster ClusterMeta | None

A ClusterMeta returned by job_cluster() to use as the shared job cluster for all tasks.

None
backfill BackfillDef | None

A BackfillDef that declares the universe of valid backfill_key values for this job. The dbxdec backfill CLI command uses this to enumerate keys when submitting bulk runs. Has no effect on runtime behaviour.

None
libraries list | None

Library dependencies to attach to each task. When None (the default), the framework uses [Library(whl="dist/*.whl")] which is appropriate for standard wheel-based deployments. Set to [] when the package is pre-installed in a custom Docker image. You may also pass explicit Library objects for PyPI or Maven dependencies.

None
**kwargs Unpack[JobConfig]

Any SDK-native Job fields (e.g. tags, schedule, max_concurrent_runs, timeout_seconds, email_notifications). These are forwarded directly to the databricks.bundles.jobs.Job constructor at deploy time. See JobConfig for the full list of supported fields.

{}
Source code in src/databricks_bundle_decorators/decorators.py
def job(
    fn: types.FunctionType | None = None,
    *,
    params: dict[str, str] | None = None,
    cluster: ClusterMeta | None = None,
    libraries: list | None = None,
    backfill: BackfillDef | None = None,
    **kwargs: Unpack[JobConfig],
):
    """Register a function as a Databricks job.

    The function body is **executed once at deploy time** (when
    ``databricks bundle deploy`` imports your module — not at
    Databricks runtime).  Inside the body, ``@task``-decorated functions
    are defined and called.  Each call returns a `TaskProxy`;
    passing a proxy to another task call records the dependency edge.

    Parameters
    ----------
    params:
        Default values for job-level parameters.  Accessible inside task
        functions via ``from databricks_bundle_decorators import params``.
    cluster:
        A `ClusterMeta` returned by `job_cluster()` to use
        as the shared job cluster for all tasks.
    backfill:
        A `BackfillDef` that declares the universe of valid
        ``backfill_key`` values for this job.  The ``dbxdec backfill``
        CLI command uses this to enumerate keys when submitting bulk
        runs.  Has no effect on runtime behaviour.
    libraries:
        Library dependencies to attach to each task.  When ``None``
        (the default), the framework uses ``[Library(whl="dist/*.whl")]``
        which is appropriate for standard wheel-based deployments.
        Set to ``[]`` when the package is pre-installed in a custom
        Docker image.  You may also pass explicit ``Library`` objects
        for PyPI or Maven dependencies.
    **kwargs:
        Any SDK-native ``Job`` fields (e.g. ``tags``, ``schedule``,
        ``max_concurrent_runs``, ``timeout_seconds``,
        ``email_notifications``).  These are forwarded directly to the
        ``databricks.bundles.jobs.Job`` constructor at deploy time.
        See `JobConfig` for the
        full list of supported fields.
    """

    def decorator(fn: types.FunctionType) -> Callable[..., Any]:
        global _current_job_name
        job_name = fn.__name__

        # --- check job uniqueness -----------------------------------------
        if job_name in _JOB_REGISTRY:
            raise DuplicateResourceError(
                f"Duplicate job '{job_name}'. Each job must have a unique name."
            )

        # --- validate param names -----------------------------------------
        if params:
            _validate_user_params(params, f"@job('{job_name}')")

        # --- validate cluster type -----------------------------------------
        if cluster is not None and not isinstance(cluster, ClusterMeta):
            raise TypeError(
                f"@job(cluster=...) expects a ClusterMeta returned by "
                f"job_cluster(), got {type(cluster).__name__!r}. "
                f"Pass the job_cluster() return value directly instead "
                f"of a string."
            )

        # --- validate and wire backfill -----------------------------------
        if backfill is not None and not isinstance(backfill, BackfillDef):
            raise TypeError(
                f"@job(backfill=...) expects a BackfillDef instance "
                f"(e.g. DailyBackfill, StaticBackfill), "
                f"got {type(backfill).__name__!r}."
            )

        # Auto-inject the backfill_key parameter when backfill is set
        effective_params: dict[str, str] = dict(params) if params else {}
        if backfill is not None:
            effective_params.setdefault(BACKFILL_KEY_PARAM, "")

        # --- execute the body to collect tasks and build the DAG ----------
        _current_job_tasks.clear()
        _current_job_dag.clear()
        _current_job_edges.clear()
        _current_job_for_each.clear()
        _current_job_all_partitions.clear()
        _current_job_name = job_name

        try:
            fn()
        finally:
            _current_job_name = None

        dag = dict(_current_job_dag)
        dag_edges = dict(_current_job_edges)
        for_each_tasks = dict(_current_job_for_each)
        all_partitions_edges = dict(_current_job_all_partitions)

        # Ensure tasks that were defined but never called (no outgoing
        # edges recorded yet) still appear in the DAG.  If the task has
        # depends_on control-flow deps, include them even when uncalled.
        for tk, t_meta in _current_job_tasks.items():
            dag.setdefault(tk, list(t_meta.depends_on))

        meta = JobMeta(
            fn=fn,
            name=job_name,
            params=effective_params,
            cluster=cluster,
            libraries=libraries,
            dag=dag,
            dag_edges=dag_edges,
            all_partitions_edges=all_partitions_edges,
            sdk_config=dict(kwargs),
            for_each_tasks=for_each_tasks,
            backfill=backfill,
        )
        _JOB_REGISTRY[job_name] = meta

        @functools.wraps(fn)
        def wrapper(*args, **kwargs):
            return fn(*args, **kwargs)

        wrapper._job_meta = meta  # type: ignore[attr-defined]
        return wrapper

    if fn is not None:
        return decorator(fn)
    return decorator

databricks_bundle_decorators.decorators.job_cluster(name, **kwargs)

Register a reusable job-cluster configuration.

Cluster spec fields (spark_version, node_type_id, num_workers, etc.) are passed as keyword arguments and forwarded directly to the databricks.bundles.jobs.ClusterSpec constructor at deploy time. The cluster is ephemeral: created when the job starts and torn down when it finishes.

Returns a ClusterMeta object that should be passed directly to @job(cluster=…).

Parameters:

Name Type Description Default
name str

Logical name for this cluster configuration.

required
**kwargs Unpack[ClusterConfig]

Any SDK-native ClusterSpec fields (e.g. spark_version, node_type_id, num_workers). See ClusterConfig for the full list of supported fields.

{}
Source code in src/databricks_bundle_decorators/decorators.py
def job_cluster(
    name: str,
    **kwargs: Unpack[ClusterConfig],
) -> ClusterMeta:
    """Register a reusable job-cluster configuration.

    Cluster spec fields (``spark_version``, ``node_type_id``,
    ``num_workers``, etc.) are passed as keyword arguments and forwarded
    directly to the ``databricks.bundles.jobs.ClusterSpec`` constructor
    at deploy time.  The cluster is ephemeral: created when the job
    starts and torn down when it finishes.

    Returns a `ClusterMeta` object that should be passed directly
    to ``@job(cluster=…)``.

    Parameters
    ----------
    name:
        Logical name for this cluster configuration.
    **kwargs:
        Any SDK-native ``ClusterSpec`` fields (e.g. ``spark_version``,
        ``node_type_id``, ``num_workers``).  See
        `ClusterConfig` for the
        full list of supported fields.
    """
    meta = ClusterMeta(name=name, spec=dict(kwargs))
    _register_unique(_CLUSTER_REGISTRY, name, meta, "job_cluster")
    return meta

databricks_bundle_decorators.context.params = _Params() module-attribute