Skip to content

Registry

Testing with registries

Decorators store metadata in global, mutable registries that persist for the lifetime of the process. In test suites, state from one test leaks into the next unless explicitly cleared. Always call reset_registries() at the start of each test:

from databricks_bundle_decorators.registry import reset_registries

class TestMyPipeline:
    def setup_method(self):
        reset_registries()

databricks_bundle_decorators.registry

Global registries for tasks, clusters, and jobs.

Decorators populate these registries at import time. The codegen module reads them to produce databricks.bundles.jobs resources at deploy time, and the runtime module reads them to dispatch task execution.

TaskMeta(fn, task_key, io_manager=None, partition_by=None, sdk_config=dict(), depends_on=list()) dataclass

Metadata recorded by the @task decorator.

ClusterMeta(name, spec=dict()) dataclass

Metadata recorded by job_cluster().

TaskValueRef(task_key, key) dataclass

Reference to a specific task-value from an upstream task.

Created via the task_value helper and passed to @for_each_task(inputs=...) to specify which upstream task-value provides the iteration list.

task_key instance-attribute

The task key of the upstream task.

key instance-attribute

The task-value key name (the key argument to set_task_value).

ForEachMeta(inputs_task_key=None, inputs_value_key=None, static_inputs=None, concurrency=None) dataclass

Metadata for a for-each task wrapper.

Recorded by @for_each_task inside a @job body. The outer task iterates over inputs and executes the inner task once per element.

inputs_task_key = None class-attribute instance-attribute

Upstream task whose task-value provides the iteration list. None when a static list is used.

inputs_value_key = None class-attribute instance-attribute

The task-value key name on the upstream task (e.g. "countries"). None when a static list is used.

static_inputs = None class-attribute instance-attribute

A static JSON-serialisable list used when no upstream task supplies the inputs dynamically.

concurrency = None class-attribute instance-attribute

Maximum parallel iterations (maps to ForEachTask.concurrency).

JobMeta(fn, name, params=dict(), cluster=None, libraries=None, dag=dict(), dag_edges=dict(), all_partitions_edges=dict(), sdk_config=dict(), for_each_tasks=dict(), backfill=None) dataclass

Metadata recorded by the @job decorator.

backfill = None class-attribute instance-attribute

Backfill definition for key enumeration. Does not affect runtime behaviour — backfill_key is always available.

DuplicateResourceError

Bases: Exception

Raised when a resource with the same key is registered twice.

reset_registries()

Reset all registries. Useful for testing.

Source code in src/databricks_bundle_decorators/registry.py
def reset_registries() -> None:
    """Reset all registries. Useful for testing."""
    _TASK_REGISTRY.clear()
    _CLUSTER_REGISTRY.clear()
    _JOB_REGISTRY.clear()