Skip to content

Codegen

databricks_bundle_decorators.codegen

Convert registries into databricks.bundles.jobs resource objects.

Called at deploy time by the resource loader. Reads the global registries populated by @task, @job_cluster, @job, and @for_each_task decorators and produces Job dataclass instances that the Databricks CLI serialises into the bundle configuration.

generate_resources(package_name='databricks_bundle_decorators', default_libraries=None, app_resource_key=None, app_permission='CAN_VIEW')

Build {resource_key: Job} from the global registries.

Parameters:

Name Type Description Default
package_name str

The Python package name used in PythonWheelTask. Must match the [project.name] in pyproject.toml.

'databricks_bundle_decorators'
default_libraries list[object] | None

Libraries attached to every task whose @job does not set libraries explicitly. When None (the default), uses [Library(whl="dist/*.whl")]. Pass an empty list to suppress the default wheel library.

None
app_resource_key str | None

When set, each generated Job is given a permissions entry that grants the app's service principal CAN_VIEW (or the level specified by app_permission). The value must be the bundle resource key of the app (e.g. "my_app_observability"). This works around databricks/cli#4309 <https://github.com/databricks/cli/issues/4309>_ where bundle deploy wipes app-granted permissions.

None
app_permission str

Permission level to grant the app service principal. Defaults to "CAN_VIEW". Only used when app_resource_key is set.

'CAN_VIEW'
Source code in src/databricks_bundle_decorators/codegen.py
def generate_resources(
    package_name: str = "databricks_bundle_decorators",
    default_libraries: list[object] | None = None,
    app_resource_key: str | None = None,
    app_permission: str = "CAN_VIEW",
) -> dict:
    """Build ``{resource_key: Job}`` from the global registries.

    Parameters
    ----------
    package_name:
        The Python package name used in ``PythonWheelTask``.  Must match
        the ``[project.name]`` in *pyproject.toml*.
    default_libraries:
        Libraries attached to every task whose ``@job`` does *not* set
        ``libraries`` explicitly.  When ``None`` (the default), uses
        ``[Library(whl="dist/*.whl")]``.  Pass an empty list to suppress
        the default wheel library.
    app_resource_key:
        When set, each generated ``Job`` is given a ``permissions``
        entry that grants the app's service principal ``CAN_VIEW``
        (or the level specified by *app_permission*).  The value must
        be the bundle resource key of the app (e.g.
        ``"my_app_observability"``).  This works around
        `databricks/cli#4309 <https://github.com/databricks/cli/issues/4309>`_
        where ``bundle deploy`` wipes app-granted permissions.
    app_permission:
        Permission level to grant the app service principal.
        Defaults to ``"CAN_VIEW"``.  Only used when
        *app_resource_key* is set.
    """
    _default_libraries: list[object] | None = (
        [Library(whl="dist/*.whl")] if default_libraries is None else default_libraries
    )

    jobs: dict[str, Job] = {}

    for job_name, job_meta in _JOB_REGISTRY.items():
        tasks: list[Task] = []

        for task_key, upstream_keys in job_meta.dag.items():
            depends_on = [TaskDependency(task_key=uk) for uk in upstream_keys]

            # ----- named_parameters sent to the wheel entry-point ----------
            named_params: dict[str, str] = {
                "__job_name__": job_name,
                "__task_key__": task_key,
                "__run_id__": "{{job.run_id}}",
            }

            # Upstream edge info so the runtime can invoke IoManager.read()
            edges = job_meta.dag_edges.get(task_key, {})
            for param_name, upstream_task in edges.items():
                named_params[f"__upstream__{param_name}"] = upstream_task

            # All-partitions flags for upstream reads
            ap_params = job_meta.all_partitions_edges.get(task_key, set())
            for param_name in ap_params:
                named_params[f"__all_partitions__{param_name}"] = "true"

            # Forward every job-level parameter to the task CLI
            for param_name in job_meta.params:
                named_params[param_name] = (
                    "{{" + f'job.parameters["{param_name}"]' + "}}"
                )

            # ----- per-task SDK config (max_retries, timeout, etc.) -----
            qualified_key = f"{job_name}.{task_key}"
            task_meta = _TASK_REGISTRY[qualified_key]
            task_sdk_config = task_meta.sdk_config

            # Use job-level libraries if explicitly set, else default wheel
            if job_meta.libraries is not None:
                task_libraries = (
                    job_meta.libraries if len(job_meta.libraries) > 0 else None
                )
            else:
                task_libraries = _default_libraries

            # ----- check if this is a for-each task -----------------------
            fe_meta = job_meta.for_each_tasks.get(task_key)

            if fe_meta is not None:
                # ---- for-each task: outer wrapper with inner task --------
                # Add __for_each_input__ so runtime knows to inject it
                named_params["__for_each_input__"] = "{{input}}"

                # Build the inner Task (the one that runs per iteration)
                inner_task_kwargs: dict[str, Any] = {
                    "task_key": f"{task_key}_inner",
                    "job_cluster_key": (
                        job_meta.cluster.name if job_meta.cluster else None
                    ),
                    "python_wheel_task": PythonWheelTask(
                        package_name=package_name,
                        entry_point="dbxdec-run",
                        named_parameters=named_params,  # ty: ignore[invalid-argument-type]
                    ),
                    **task_sdk_config,
                }
                if task_libraries is not None:
                    inner_task_kwargs["libraries"] = task_libraries

                inner_task = Task(**inner_task_kwargs)

                # Determine the inputs expression
                if fe_meta.inputs_task_key is not None:
                    value_key = fe_meta.inputs_value_key
                    inputs_expr = (
                        f"{{{{tasks.{fe_meta.inputs_task_key}.values.{value_key}}}}}"
                    )
                else:
                    inputs_expr = json.dumps(fe_meta.static_inputs)

                for_each = ForEachTask(
                    inputs=inputs_expr,
                    task=inner_task,
                    concurrency=fe_meta.concurrency,
                )

                outer_task_obj = Task(
                    task_key=task_key,
                    depends_on=depends_on,  # ty: ignore[invalid-argument-type]
                    for_each_task=for_each,
                )
                tasks.append(outer_task_obj)
            else:
                # ---- regular task ----------------------------------------
                task_kwargs: dict[str, Any] = {
                    "task_key": task_key,
                    "depends_on": depends_on,
                    "job_cluster_key": (
                        job_meta.cluster.name if job_meta.cluster else None
                    ),
                    "python_wheel_task": PythonWheelTask(
                        package_name=package_name,
                        entry_point="dbxdec-run",
                        named_parameters=named_params,  # SDK Variable wrappers  # ty: ignore[invalid-argument-type]
                    ),
                    **task_sdk_config,
                }
                if task_libraries is not None:
                    task_kwargs["libraries"] = task_libraries

                task_obj = Task(**task_kwargs)  # dynamic kwargs
                tasks.append(task_obj)

        # ----- job clusters -----------------------------------------------
        job_clusters: list[JobCluster] = []
        if job_meta.cluster is not None:
            job_clusters.append(
                JobCluster(
                    job_cluster_key=job_meta.cluster.name,
                    new_cluster=ClusterSpec.from_dict(
                        job_meta.cluster.spec  # ty: ignore[invalid-argument-type]
                    ),  # typed as ClusterSpecDict
                )
            )

        # ----- parameters -------------------------------------------------
        parameters = [
            JobParameterDefinition(name=k, default=v)
            for k, v in job_meta.params.items()
        ]

        # ----- backfill tag -----------------------------------------------
        if job_meta.backfill is not None:
            existing_tags: dict[str, str] = job_meta.sdk_config.get("tags", {})
            job_meta.sdk_config["tags"] = {
                **existing_tags,
                BACKFILL_TAG: _serialize_backfill_tag(job_meta.backfill),
            }

        # ----- app permissions --------------------------------------------
        # When an app resource key is set, grant the app's service
        # principal permission on each job so that ``bundle deploy``
        # preserves the ACL.  See https://github.com/databricks/cli/issues/4309
        permissions: list[JobPermission] = []
        if app_resource_key is not None:
            sp_ref = Variable(
                path=f"resources.apps.{app_resource_key}.service_principal_client_id",
                type=str,
            )
            level = JobPermissionLevel(app_permission)
            permissions.append(
                JobPermission(level=level, service_principal_name=sp_ref)
            )

        job_obj = Job(
            name=job_name,
            tasks=tasks,  # SDK Variable wrappers  # ty: ignore[invalid-argument-type]
            parameters=parameters,  # ty: ignore[invalid-argument-type]
            job_clusters=job_clusters,  # SDK Variable wrappers  # ty: ignore[invalid-argument-type]
            permissions=permissions,  # ty: ignore[invalid-argument-type]
            **job_meta.sdk_config,
        )
        jobs[job_name] = job_obj

    return jobs