def generate_resources(
package_name: str = "databricks_bundle_decorators",
default_libraries: list[object] | None = None,
app_resource_key: str | None = None,
app_permission: str = "CAN_VIEW",
) -> dict:
"""Build ``{resource_key: Job}`` from the global registries.
Parameters
----------
package_name:
The Python package name used in ``PythonWheelTask``. Must match
the ``[project.name]`` in *pyproject.toml*.
default_libraries:
Libraries attached to every task whose ``@job`` does *not* set
``libraries`` explicitly. When ``None`` (the default), uses
``[Library(whl="dist/*.whl")]``. Pass an empty list to suppress
the default wheel library.
app_resource_key:
When set, each generated ``Job`` is given a ``permissions``
entry that grants the app's service principal ``CAN_VIEW``
(or the level specified by *app_permission*). The value must
be the bundle resource key of the app (e.g.
``"my_app_observability"``). This works around
`databricks/cli#4309 <https://github.com/databricks/cli/issues/4309>`_
where ``bundle deploy`` wipes app-granted permissions.
app_permission:
Permission level to grant the app service principal.
Defaults to ``"CAN_VIEW"``. Only used when
*app_resource_key* is set.
"""
_default_libraries: list[object] | None = (
[Library(whl="dist/*.whl")] if default_libraries is None else default_libraries
)
jobs: dict[str, Job] = {}
for job_name, job_meta in _JOB_REGISTRY.items():
tasks: list[Task] = []
for task_key, upstream_keys in job_meta.dag.items():
depends_on = [TaskDependency(task_key=uk) for uk in upstream_keys]
# ----- named_parameters sent to the wheel entry-point ----------
named_params: dict[str, str] = {
"__job_name__": job_name,
"__task_key__": task_key,
"__run_id__": "{{job.run_id}}",
}
# Upstream edge info so the runtime can invoke IoManager.read()
edges = job_meta.dag_edges.get(task_key, {})
for param_name, upstream_task in edges.items():
named_params[f"__upstream__{param_name}"] = upstream_task
# All-partitions flags for upstream reads
ap_params = job_meta.all_partitions_edges.get(task_key, set())
for param_name in ap_params:
named_params[f"__all_partitions__{param_name}"] = "true"
# Forward every job-level parameter to the task CLI
for param_name in job_meta.params:
named_params[param_name] = (
"{{" + f'job.parameters["{param_name}"]' + "}}"
)
# ----- per-task SDK config (max_retries, timeout, etc.) -----
qualified_key = f"{job_name}.{task_key}"
task_meta = _TASK_REGISTRY[qualified_key]
task_sdk_config = task_meta.sdk_config
# Use job-level libraries if explicitly set, else default wheel
if job_meta.libraries is not None:
task_libraries = (
job_meta.libraries if len(job_meta.libraries) > 0 else None
)
else:
task_libraries = _default_libraries
# ----- check if this is a for-each task -----------------------
fe_meta = job_meta.for_each_tasks.get(task_key)
if fe_meta is not None:
# ---- for-each task: outer wrapper with inner task --------
# Add __for_each_input__ so runtime knows to inject it
named_params["__for_each_input__"] = "{{input}}"
# Build the inner Task (the one that runs per iteration)
inner_task_kwargs: dict[str, Any] = {
"task_key": f"{task_key}_inner",
"job_cluster_key": (
job_meta.cluster.name if job_meta.cluster else None
),
"python_wheel_task": PythonWheelTask(
package_name=package_name,
entry_point="dbxdec-run",
named_parameters=named_params, # ty: ignore[invalid-argument-type]
),
**task_sdk_config,
}
if task_libraries is not None:
inner_task_kwargs["libraries"] = task_libraries
inner_task = Task(**inner_task_kwargs)
# Determine the inputs expression
if fe_meta.inputs_task_key is not None:
value_key = fe_meta.inputs_value_key
inputs_expr = (
f"{{{{tasks.{fe_meta.inputs_task_key}.values.{value_key}}}}}"
)
else:
inputs_expr = json.dumps(fe_meta.static_inputs)
for_each = ForEachTask(
inputs=inputs_expr,
task=inner_task,
concurrency=fe_meta.concurrency,
)
outer_task_obj = Task(
task_key=task_key,
depends_on=depends_on, # ty: ignore[invalid-argument-type]
for_each_task=for_each,
)
tasks.append(outer_task_obj)
else:
# ---- regular task ----------------------------------------
task_kwargs: dict[str, Any] = {
"task_key": task_key,
"depends_on": depends_on,
"job_cluster_key": (
job_meta.cluster.name if job_meta.cluster else None
),
"python_wheel_task": PythonWheelTask(
package_name=package_name,
entry_point="dbxdec-run",
named_parameters=named_params, # SDK Variable wrappers # ty: ignore[invalid-argument-type]
),
**task_sdk_config,
}
if task_libraries is not None:
task_kwargs["libraries"] = task_libraries
task_obj = Task(**task_kwargs) # dynamic kwargs
tasks.append(task_obj)
# ----- job clusters -----------------------------------------------
job_clusters: list[JobCluster] = []
if job_meta.cluster is not None:
job_clusters.append(
JobCluster(
job_cluster_key=job_meta.cluster.name,
new_cluster=ClusterSpec.from_dict(
job_meta.cluster.spec # ty: ignore[invalid-argument-type]
), # typed as ClusterSpecDict
)
)
# ----- parameters -------------------------------------------------
parameters = [
JobParameterDefinition(name=k, default=v)
for k, v in job_meta.params.items()
]
# ----- backfill tag -----------------------------------------------
if job_meta.backfill is not None:
existing_tags: dict[str, str] = job_meta.sdk_config.get("tags", {})
job_meta.sdk_config["tags"] = {
**existing_tags,
BACKFILL_TAG: _serialize_backfill_tag(job_meta.backfill),
}
# ----- app permissions --------------------------------------------
# When an app resource key is set, grant the app's service
# principal permission on each job so that ``bundle deploy``
# preserves the ACL. See https://github.com/databricks/cli/issues/4309
permissions: list[JobPermission] = []
if app_resource_key is not None:
sp_ref = Variable(
path=f"resources.apps.{app_resource_key}.service_principal_client_id",
type=str,
)
level = JobPermissionLevel(app_permission)
permissions.append(
JobPermission(level=level, service_principal_name=sp_ref)
)
job_obj = Job(
name=job_name,
tasks=tasks, # SDK Variable wrappers # ty: ignore[invalid-argument-type]
parameters=parameters, # ty: ignore[invalid-argument-type]
job_clusters=job_clusters, # SDK Variable wrappers # ty: ignore[invalid-argument-type]
permissions=permissions, # ty: ignore[invalid-argument-type]
**job_meta.sdk_config,
)
jobs[job_name] = job_obj
return jobs