Skip to content

Backfill Definitions

databricks_bundle_decorators.backfill.BackfillDef

Bases: ABC

Base class for backfill definitions.

Subclasses declare the universe of valid backfill keys for enumeration. The dbxdec backfill CLI uses these to generate backfill_key values.

keys(start=None, end=None) abstractmethod

Enumerate concrete backfill key strings.

Parameters:

Name Type Description Default
start str | None

Override the start bound (inclusive). Must use the same format as the definition's keys.

None
end str | None

Override the end bound (inclusive). Must use the same format as the definition's keys.

None
Source code in src/databricks_bundle_decorators/backfill.py
@abstractmethod
def keys(self, start: str | None = None, end: str | None = None) -> list[str]:
    """Enumerate concrete backfill key strings.

    Parameters
    ----------
    start:
        Override the start bound (inclusive).  Must use the same
        format as the definition's keys.
    end:
        Override the end bound (inclusive).  Must use the same
        format as the definition's keys.
    """
    ...

databricks_bundle_decorators.backfill.DailyBackfill(start_date, end_date=None, tz='UTC') dataclass

Bases: BackfillDef

One key per calendar day.

Keys are ISO-8601 dates: YYYY-MM-DD.

Parameters:

Name Type Description Default
start_date str

First key (inclusive), e.g. "2024-01-01".

required
end_date str | None

Last key (inclusive). Defaults to today in tz.

None
tz str

IANA timezone name (e.g. "UTC", "Europe/Berlin"). Used to determine "yesterday" when end_date is omitted.

'UTC'

databricks_bundle_decorators.backfill.WeeklyBackfill(start_date, end_date=None, tz='UTC') dataclass

Bases: BackfillDef

One key per ISO week.

Keys are ISO week dates: YYYY-WNN (e.g. "2024-W03").

The default end_date is the Monday of the current ISO week.

Parameters:

Name Type Description Default
start_date str

First key (inclusive), e.g. "2024-W01".

required
end_date str | None

Last key (inclusive). Defaults to the current ISO week.

None
tz str

IANA timezone name. Used to determine "today" when end_date is omitted.

'UTC'

databricks_bundle_decorators.backfill.MonthlyBackfill(start_date, end_date=None, tz='UTC') dataclass

Bases: BackfillDef

One key per calendar month.

Keys are ISO-8601 dates pinned to the first of the month: YYYY-MM-01 (e.g. "2024-01-01").

Parameters:

Name Type Description Default
start_date str

First key (inclusive), e.g. "2024-01-01".

required
end_date str | None

Last key (inclusive). Defaults to the current month.

None
tz str

IANA timezone name. Used to determine "today" when end_date is omitted.

'UTC'

databricks_bundle_decorators.backfill.HourlyBackfill(start_date, end_date=None, tz='UTC') dataclass

Bases: BackfillDef

One key per hour.

Keys are truncated ISO-8601 timestamps: YYYY-MM-DDTHH (e.g. "2024-01-01T00").

All enumeration is performed in the specified timezone (default UTC) so that daylight-saving transitions are handled correctly — hours that don't exist are skipped, and ambiguous hours appear once.

Parameters:

Name Type Description Default
start_date str

First key (inclusive), e.g. "2024-01-01T00".

required
end_date str | None

Last key (inclusive). Defaults to the current hour in tz.

None
tz str

IANA timezone name (e.g. "UTC", "America/New_York"). Defaults to "UTC" to sidestep daylight-saving issues.

'UTC'

databricks_bundle_decorators.backfill.StaticBackfill(keys) dataclass

Bases: BackfillDef

A fixed set of backfill keys.

Parameters:

Name Type Description Default
keys list[str]

The complete list of valid backfill keys.

required
Example

::

StaticBackfill(keys=["us", "eu", "jp"])
Source code in src/databricks_bundle_decorators/backfill.py
def __init__(self, keys: list[str]) -> None:
    # Defensive copy so mutations to the caller's list don't leak.
    object.__setattr__(self, "_keys", list(keys))

databricks_bundle_decorators.backfill.get_backfill_key(*, validate=True)

Return the raw backfill key for the current job run.

Reads the backfill_key job parameter and optionally validates it against the job's BackfillDef boundaries.

For time-based backfills the key is an ISO-8601 date/time string; for StaticBackfill it is one of the declared keys (e.g. "us", "eu").

Parameters:

Name Type Description Default
validate bool

When True (the default), verify that the key is valid for the job's BackfillDef. A ValueError is raised if the key is out of range. Ignored when the job has no backfill definition.

True

Raises:

Type Description
RuntimeError

If backfill_key is missing or empty. This indicates the job has no backfill definition and was not started via the backfill CLI.

ValueError

If validate is True and the backfill key is outside the backfill definition's boundaries.

Returns:

Type Description
str

The raw backfill key string.

Source code in src/databricks_bundle_decorators/backfill.py
def get_backfill_key(*, validate: bool = True) -> str:
    """Return the raw backfill key for the current job run.

    Reads the ``backfill_key`` job parameter and optionally validates
    it against the job's `BackfillDef` boundaries.

    For time-based backfills the key is an ISO-8601 date/time string;
    for `StaticBackfill` it is one of the declared keys (e.g.
    ``"us"``, ``"eu"``).

    Parameters
    ----------
    validate:
        When ``True`` (the default), verify that the key is valid for
        the job's `BackfillDef`.  A `ValueError` is raised if the
        key is out of range.  Ignored when the job has no backfill
        definition.

    Raises
    ------
    RuntimeError
        If ``backfill_key`` is missing or empty.  This indicates the
        job has no backfill definition and was not started via the
        backfill CLI.
    ValueError
        If *validate* is ``True`` and the backfill key is outside the
        backfill definition's boundaries.

    Returns
    -------
    str
        The raw backfill key string.
    """
    from databricks_bundle_decorators.context import params

    raw = params.get(BACKFILL_KEY_PARAM, "")
    if not raw:
        raise RuntimeError(
            "backfill_key is not set. "
            "This usually means the job was not invoked with a "
            "backfill_key parameter. Use @job(backfill=...) and "
            "the backfill CLI, or pass backfill_key explicitly."
        )
    if validate:
        job_name: str | None = params.get("__job_name__")
        _validate_backfill_key(raw, job_name)

    return raw

databricks_bundle_decorators.backfill.get_run_logical_date(*, validate=True)

Return the backfill key parsed as a timezone-aware datetime.

Convenience wrapper around get_backfill_key for time-based backfills (DailyBackfill, WeeklyBackfill, etc.). Not suitable for StaticBackfill with non-date keys — use get_backfill_key instead.

Parameters:

Name Type Description Default
validate bool

When True (the default), verify that the key is valid for the job's BackfillDef. A ValueError is raised if the key is out of range. Ignored when the job has no backfill definition.

True

Raises:

Type Description
RuntimeError

If backfill_key is missing or empty.

ValueError

If the key cannot be parsed as an ISO-8601 date/time, or if validate is True and it falls outside the backfill definition's boundaries.

Returns:

Type Description
datetime

Timezone-aware datetime representing the backfill key.

Source code in src/databricks_bundle_decorators/backfill.py
def get_run_logical_date(*, validate: bool = True) -> datetime:
    """Return the backfill key parsed as a timezone-aware ``datetime``.

    Convenience wrapper around `get_backfill_key` for time-based
    backfills (`DailyBackfill`, `WeeklyBackfill`, etc.).  Not
    suitable for `StaticBackfill` with non-date keys — use
    `get_backfill_key` instead.

    Parameters
    ----------
    validate:
        When ``True`` (the default), verify that the key is valid for
        the job's `BackfillDef`.  A `ValueError` is raised if the
        key is out of range.  Ignored when the job has no backfill
        definition.

    Raises
    ------
    RuntimeError
        If ``backfill_key`` is missing or empty.
    ValueError
        If the key cannot be parsed as an ISO-8601 date/time, or if
        *validate* is ``True`` and it falls outside the backfill
        definition's boundaries.

    Returns
    -------
    datetime
        Timezone-aware datetime representing the backfill key.
    """
    raw = get_backfill_key(validate=validate)
    return _parse_logical_date_str(raw)

Cross-partition reads

databricks_bundle_decorators.decorators.all_partitions(proxy)

Wrap a TaskProxy so the downstream task receives all partitions.

Use inside a @job body to indicate that the downstream task should read the entire dataset from the upstream task, across all partitions, rather than filtering to the current backfill_key.

Parameters:

Name Type Description Default
proxy TaskProxy

A TaskProxy returned by calling a @task-decorated function inside a @job body.

required

Returns:

Type Description
`_AllPartitionsProxy`

A wrapped proxy that records the all-partitions flag on the dependency edge.

Example

::

@job(backfill=DailyBackfill(start_date="2024-01-01"))
def my_pipeline():
    @task(io_manager=io)
    def extract():
        ...

    @task
    def aggregate(data):
        ...

    data = extract()
    aggregate(all_partitions(data))
Source code in src/databricks_bundle_decorators/decorators.py
def all_partitions(proxy: TaskProxy) -> _AllPartitionsProxy:
    """Wrap a `TaskProxy` so the downstream task receives all partitions.

    Use inside a ``@job`` body to indicate that the downstream task
    should read the **entire** dataset from the upstream task, across
    all partitions, rather than filtering to the current ``backfill_key``.

    Parameters
    ----------
    proxy:
        A `TaskProxy` returned by calling a ``@task``-decorated
        function inside a ``@job`` body.

    Returns
    -------
    `_AllPartitionsProxy`
        A wrapped proxy that records the all-partitions flag on the
        dependency edge.

    Example
    -------
    ::

        @job(backfill=DailyBackfill(start_date="2024-01-01"))
        def my_pipeline():
            @task(io_manager=io)
            def extract():
                ...

            @task
            def aggregate(data):
                ...

            data = extract()
            aggregate(all_partitions(data))
    """
    if not isinstance(proxy, TaskProxy):
        raise TypeError(
            f"all_partitions() expects a TaskProxy returned by calling "
            f"a @task-decorated function inside a @job body, "
            f"got {type(proxy).__name__!r}."
        )
    return _AllPartitionsProxy(proxy.task_key)