Skip to content

Backfill Definitions

databricks_bundle_decorators.backfill.BackfillDef

Bases: ABC

Base class for backfill definitions.

Subclasses declare the universe of valid backfill keys for enumeration. The dbxdec backfill CLI uses these to generate backfill_key values.

keys(start=None, end=None) abstractmethod

Enumerate concrete backfill key strings.

Parameters:

Name Type Description Default
start str | None

Override the start bound (inclusive). Must use the same format as the definition's keys.

None
end str | None

Override the end bound (inclusive). Must use the same format as the definition's keys.

None
Source code in src/databricks_bundle_decorators/backfill.py
@abstractmethod
def keys(self, start: str | None = None, end: str | None = None) -> list[str]:
    """Enumerate concrete backfill key strings.

    Parameters
    ----------
    start:
        Override the start bound (inclusive).  Must use the same
        format as the definition's keys.
    end:
        Override the end bound (inclusive).  Must use the same
        format as the definition's keys.
    """
    ...

current_key()

Return the backfill key for the current point in time.

Used as a fallback when a job with a backfill definition is triggered without an explicit backfill_key (e.g. by a cron schedule or file-arrival trigger).

Time-based subclasses return the key matching "now" in their configured timezone. StaticBackfill returns None because there is no sensible default.

Source code in src/databricks_bundle_decorators/backfill.py
def current_key(self) -> str | None:
    """Return the backfill key for the current point in time.

    Used as a fallback when a job with a backfill definition is
    triggered without an explicit ``backfill_key`` (e.g. by a cron
    schedule or file-arrival trigger).

    Time-based subclasses return the key matching "now" in their
    configured timezone.  `StaticBackfill` returns ``None``
    because there is no sensible default.
    """
    return None

databricks_bundle_decorators.backfill.DailyBackfill(start_date, end_date=None, tz='UTC', lookback=0, collect_schedule_gaps=False, data_lag=0) dataclass

Bases: BackfillDef

One key per calendar day.

Keys are ISO-8601 dates: YYYY-MM-DD.

Parameters:

Name Type Description Default
start_date str

First key (inclusive), e.g. "2024-01-01".

required
end_date str | None

Last key (inclusive). Defaults to today in tz.

None
tz str

IANA timezone name (e.g. "UTC", "Europe/Berlin"). Used to determine "yesterday" when end_date is omitted.

'UTC'
lookback int

Number of additional prior keys to include. For example, lookback=2 with key "2026-01-08" yields ["2026-01-06", "2026-01-07", "2026-01-08"]. Applies in all run modes (scheduled and explicit backfill).

0
collect_schedule_gaps bool

When True, get_backfill_keys also returns keys for days between the previous cron fire date and the current key. Only applies to auto-derived keys (scheduled runs); bypassed during explicit dbxdec backfill --keys invocations.

False
data_lag int

Number of periods to subtract from the default end bound. Use data_lag=1 when the data source provides data for T-1 (yesterday) rather than T (today). Only affects the default end when end_date is None and no explicit --end is passed to the CLI. Also shifts current_key backwards by the same amount.

0

current_key()

Today's date in the configured timezone, shifted by data_lag.

Source code in src/databricks_bundle_decorators/backfill.py
def current_key(self) -> str:
    """Today's date in the configured timezone, shifted by `data_lag`."""
    d = whenever.ZonedDateTime.now(self.tz).date()
    if self.data_lag:
        d = d.subtract(days=self.data_lag)
    return d.format(self._FMT)

databricks_bundle_decorators.backfill.WeeklyBackfill(start_date, end_date=None, tz='UTC', lookback=0, collect_schedule_gaps=False, data_lag=0) dataclass

Bases: BackfillDef

One key per ISO week.

Keys are ISO week dates: YYYY-WNN (e.g. "2024-W03").

The default end_date is the Monday of the current ISO week.

Parameters:

Name Type Description Default
start_date str

First key (inclusive), e.g. "2024-W01".

required
end_date str | None

Last key (inclusive). Defaults to the current ISO week.

None
tz str

IANA timezone name. Used to determine "today" when end_date is omitted.

'UTC'
lookback int

Number of additional prior keys (weeks) to include.

0
collect_schedule_gaps bool

When True, get_backfill_keys also returns keys for weeks between the previous cron fire and the current key.

False
data_lag int

Number of periods (weeks) to subtract from the default end bound. Use data_lag=1 when data arrives one week late. Also shifts current_key backwards.

0

current_key()

Current ISO week in the configured timezone, shifted by data_lag.

Source code in src/databricks_bundle_decorators/backfill.py
def current_key(self) -> str:
    """Current ISO week in the configured timezone, shifted by `data_lag`."""
    today = whenever.ZonedDateTime.now(self.tz).date()
    if self.data_lag:
        today = today.subtract(weeks=self.data_lag)
    return self._fmt_iso_week(today)

databricks_bundle_decorators.backfill.MonthlyBackfill(start_date, end_date=None, tz='UTC', lookback=0, collect_schedule_gaps=False, data_lag=0) dataclass

Bases: BackfillDef

One key per calendar month.

Keys are ISO-8601 dates pinned to the first of the month: YYYY-MM-01 (e.g. "2024-01-01").

Parameters:

Name Type Description Default
start_date str

First key (inclusive), e.g. "2024-01-01".

required
end_date str | None

Last key (inclusive). Defaults to the current month.

None
tz str

IANA timezone name. Used to determine "today" when end_date is omitted.

'UTC'
lookback int

Number of additional prior keys (months) to include.

0
collect_schedule_gaps bool

When True, get_backfill_keys also returns keys for months between the previous cron fire and the current key.

False
data_lag int

Number of periods (months) to subtract from the default end bound. Use data_lag=1 when data arrives one month late. Also shifts current_key backwards.

0

current_key()

First day of the current month in the configured timezone, shifted by data_lag.

Source code in src/databricks_bundle_decorators/backfill.py
def current_key(self) -> str:
    """First day of the current month in the configured timezone, shifted by `data_lag`."""
    today = whenever.ZonedDateTime.now(self.tz).date()
    if self.data_lag:
        today = today.subtract(months=self.data_lag)
    return today.replace(day=1).format(self._FMT)

databricks_bundle_decorators.backfill.HourlyBackfill(start_date, end_date=None, tz='UTC', lookback=0, collect_schedule_gaps=False, data_lag=0) dataclass

Bases: BackfillDef

One key per hour.

Keys are truncated ISO-8601 timestamps: YYYY-MM-DDTHH (e.g. "2024-01-01T00").

All enumeration is performed in the specified timezone (default UTC) so that daylight-saving transitions are handled correctly — hours that don't exist are skipped, and ambiguous hours appear once.

Parameters:

Name Type Description Default
start_date str

First key (inclusive), e.g. "2024-01-01T00".

required
end_date str | None

Last key (inclusive). Defaults to the current hour in tz.

None
tz str

IANA timezone name (e.g. "UTC", "America/New_York"). Defaults to "UTC" to sidestep daylight-saving issues.

'UTC'
lookback int

Number of additional prior keys (hours) to include.

0
collect_schedule_gaps bool

When True, get_backfill_keys also returns keys for hours between the previous cron fire and the current key.

False
data_lag int

Number of periods (hours) to subtract from the default end bound. Use data_lag=1 when data arrives one hour late. Also shifts current_key backwards.

0

current_key()

Current hour in the configured timezone, shifted by data_lag.

Source code in src/databricks_bundle_decorators/backfill.py
def current_key(self) -> str:
    """Current hour in the configured timezone, shifted by `data_lag`."""
    now = whenever.ZonedDateTime.now(self.tz)
    now = now.replace(minute=0, second=0, nanosecond=0)
    if self.data_lag:
        now = now.subtract(hours=self.data_lag)
    return now.format(self._FMT)

databricks_bundle_decorators.backfill.StaticBackfill(keys) dataclass

Bases: BackfillDef

A fixed set of backfill keys.

Parameters:

Name Type Description Default
keys list[str]

The complete list of valid backfill keys.

required
Example

::

StaticBackfill(keys=["us", "eu", "jp"])
Source code in src/databricks_bundle_decorators/backfill.py
def __init__(self, keys: list[str]) -> None:
    # Defensive copy so mutations to the caller's list don't leak.
    object.__setattr__(self, "_keys", list(keys))

databricks_bundle_decorators.backfill.get_backfill_key(*, validate=True)

Return the raw backfill key for the current job run.

Reads the backfill_key job parameter and optionally validates it against the job's BackfillDef boundaries.

When the parameter is missing or empty and the job has a time-based BackfillDef, the key is auto-derived from the current time (e.g. today's date for DailyBackfill) and a warning is logged. This allows cron-triggered and file-arrival runs to work without explicitly supplying the key.

For time-based backfills the key is an ISO-8601 date/time string; for StaticBackfill it is one of the declared keys (e.g. "us", "eu").

Parameters:

Name Type Description Default
validate bool

When True (the default), verify that the key is valid for the job's BackfillDef. A ValueError is raised if the key is out of range. Ignored when the job has no backfill definition.

True

Raises:

Type Description
RuntimeError

If backfill_key is missing or empty and no automatic key can be derived (e.g. the job has no backfill definition, or uses StaticBackfill).

ValueError

If validate is True and the backfill key is outside the backfill definition's boundaries.

Returns:

Type Description
str

The raw backfill key string.

Source code in src/databricks_bundle_decorators/backfill.py
def get_backfill_key(*, validate: bool = True) -> str:
    """Return the raw backfill key for the current job run.

    Reads the ``backfill_key`` job parameter and optionally validates
    it against the job's `BackfillDef` boundaries.

    When the parameter is missing or empty **and** the job has a
    time-based `BackfillDef`, the key is auto-derived from the
    current time (e.g. today's date for `DailyBackfill`) and a
    warning is logged.  This allows cron-triggered and file-arrival
    runs to work without explicitly supplying the key.

    For time-based backfills the key is an ISO-8601 date/time string;
    for `StaticBackfill` it is one of the declared keys (e.g.
    ``"us"``, ``"eu"``).

    Parameters
    ----------
    validate:
        When ``True`` (the default), verify that the key is valid for
        the job's `BackfillDef`.  A `ValueError` is raised if the
        key is out of range.  Ignored when the job has no backfill
        definition.

    Raises
    ------
    RuntimeError
        If ``backfill_key`` is missing or empty and no automatic
        key can be derived (e.g. the job has no backfill definition,
        or uses `StaticBackfill`).
    ValueError
        If *validate* is ``True`` and the backfill key is outside the
        backfill definition's boundaries.

    Returns
    -------
    str
        The raw backfill key string.
    """
    raw = params.get(BACKFILL_KEY_PARAM, "")
    if not raw:
        raw = _auto_derive_backfill_key()
    if validate:
        job_name: str | None = params.get("__job_name__")
        _validate_backfill_key(raw, job_name)

    return raw

databricks_bundle_decorators.backfill.get_backfill_keys(*, validate=True)

Return all backfill keys for the current run.

When neither lookback nor collect_schedule_gaps is configured, this returns a single-element list equivalent to [get_backfill_key()].

With lookback=N, the result includes N prior keys plus the current key. This applies in all run modes (scheduled and explicit backfill).

With collect_schedule_gaps=True, keys between the previous cron fire date and the current key are included. This only applies to auto-derived keys (scheduled runs); when backfill_key is explicitly provided (via dbxdec backfill --keys), schedule gap logic is bypassed.

When both are configured the result is the sorted union.

Parameters:

Name Type Description Default
validate bool

When True (the default), verify that the primary key is valid for the job's BackfillDef.

True

Returns:

Type Description
list[str]

Sorted list of backfill key strings (ascending).

Source code in src/databricks_bundle_decorators/backfill.py
def get_backfill_keys(*, validate: bool = True) -> list[str]:
    """Return all backfill keys for the current run.

    When neither ``lookback`` nor ``collect_schedule_gaps`` is
    configured, this returns a single-element list equivalent to
    ``[get_backfill_key()]``.

    With ``lookback=N``, the result includes *N* prior keys plus the
    current key.  This applies in **all** run modes (scheduled and
    explicit backfill).

    With ``collect_schedule_gaps=True``, keys between the previous
    cron fire date and the current key are included.  This only
    applies to auto-derived keys (scheduled runs); when
    ``backfill_key`` is explicitly provided (via ``dbxdec backfill
    --keys``), schedule gap logic is bypassed.

    When both are configured the result is the sorted union.

    Parameters
    ----------
    validate:
        When ``True`` (the default), verify that the primary key is
        valid for the job's `BackfillDef`.

    Returns
    -------
    list[str]
        Sorted list of backfill key strings (ascending).
    """
    primary_key = get_backfill_key(validate=validate)

    job_name: str | None = params.get("__job_name__")
    if job_name is None:
        return [primary_key]

    job_meta = _JOB_REGISTRY.get(job_name)
    if job_meta is None or job_meta.backfill is None:
        return [primary_key]

    backfill = job_meta.backfill

    # StaticBackfill has no lookback/schedule_gaps support
    if isinstance(backfill, StaticBackfill):
        return [primary_key]

    # --exact flag: bypass all multi-key expansion
    if params.get(EXACT_BACKFILL_PARAM, "") == "1":
        return [primary_key]

    lookback: int = getattr(backfill, "lookback", 0)
    collect_gaps: bool = getattr(backfill, "collect_schedule_gaps", False)

    if lookback == 0 and not collect_gaps:
        return [primary_key]

    all_keys: set[str] = {primary_key}

    # Lookback: always applies
    if lookback > 0:
        all_keys.update(_compute_lookback_keys(backfill, primary_key, lookback))

    # Schedule gaps: only when key was auto-derived (not explicitly provided)
    if collect_gaps:
        explicitly_provided = bool(params.get(BACKFILL_KEY_PARAM, ""))
        if not explicitly_provided:
            schedule_cron = _get_job_schedule_cron(job_name)
            if schedule_cron is not None:
                all_keys.update(
                    _compute_schedule_gap_keys(backfill, primary_key, schedule_cron)
                )

    result = sorted(all_keys)
    if len(result) > 1:
        _logger.info(
            "Expanded backfill_key %r to %d keys (lookback=%d, collect_schedule_gaps=%s): %s",
            primary_key,
            len(result),
            lookback,
            collect_gaps,
            result,
        )
    return result

databricks_bundle_decorators.backfill.get_run_logical_date(*, validate=True)

Return the backfill key parsed as a timezone-aware datetime.

Convenience wrapper around get_backfill_key for time-based backfills (DailyBackfill, WeeklyBackfill, etc.). Not suitable for StaticBackfill with non-date keys — use get_backfill_key instead.

Parameters:

Name Type Description Default
validate bool

When True (the default), verify that the key is valid for the job's BackfillDef. A ValueError is raised if the key is out of range. Ignored when the job has no backfill definition.

True

Raises:

Type Description
RuntimeError

If backfill_key is missing or empty.

ValueError

If the key cannot be parsed as an ISO-8601 date/time, or if validate is True and it falls outside the backfill definition's boundaries.

Returns:

Type Description
datetime

Timezone-aware datetime representing the backfill key.

Source code in src/databricks_bundle_decorators/backfill.py
def get_run_logical_date(*, validate: bool = True) -> datetime:
    """Return the backfill key parsed as a timezone-aware ``datetime``.

    Convenience wrapper around `get_backfill_key` for time-based
    backfills (`DailyBackfill`, `WeeklyBackfill`, etc.).  Not
    suitable for `StaticBackfill` with non-date keys — use
    `get_backfill_key` instead.

    Parameters
    ----------
    validate:
        When ``True`` (the default), verify that the key is valid for
        the job's `BackfillDef`.  A `ValueError` is raised if the
        key is out of range.  Ignored when the job has no backfill
        definition.

    Raises
    ------
    RuntimeError
        If ``backfill_key`` is missing or empty.
    ValueError
        If the key cannot be parsed as an ISO-8601 date/time, or if
        *validate* is ``True`` and it falls outside the backfill
        definition's boundaries.

    Returns
    -------
    datetime
        Timezone-aware datetime representing the backfill key.
    """
    raw = get_backfill_key(validate=validate)
    return _parse_logical_date_str(raw)

Cross-partition reads

databricks_bundle_decorators.decorators.all_partitions(proxy)

Wrap a TaskProxy so the downstream task receives all partitions.

Use inside a @job body to indicate that the downstream task should read the entire dataset from the upstream task, across all partitions, rather than filtering to the current backfill_key.

Parameters:

Name Type Description Default
proxy TaskProxy

A TaskProxy returned by calling a @task-decorated function inside a @job body.

required

Returns:

Type Description
`_AllPartitionsProxy`

A wrapped proxy that records the all-partitions flag on the dependency edge.

Example

::

@job(backfill=DailyBackfill(start_date="2024-01-01"))
def my_pipeline():
    @task(io_manager=io)
    def extract(): ...

    @task
    def aggregate(data): ...

    data = extract()
    aggregate(all_partitions(data))
Source code in src/databricks_bundle_decorators/decorators.py
def all_partitions(proxy: TaskProxy) -> _AllPartitionsProxy:
    """Wrap a `TaskProxy` so the downstream task receives all partitions.

    Use inside a ``@job`` body to indicate that the downstream task
    should read the **entire** dataset from the upstream task, across
    all partitions, rather than filtering to the current ``backfill_key``.

    Parameters
    ----------
    proxy:
        A `TaskProxy` returned by calling a ``@task``-decorated
        function inside a ``@job`` body.

    Returns
    -------
    `_AllPartitionsProxy`
        A wrapped proxy that records the all-partitions flag on the
        dependency edge.

    Example
    -------
    ::

        @job(backfill=DailyBackfill(start_date="2024-01-01"))
        def my_pipeline():
            @task(io_manager=io)
            def extract(): ...

            @task
            def aggregate(data): ...

            data = extract()
            aggregate(all_partitions(data))
    """
    if not isinstance(proxy, TaskProxy):
        raise TypeError(
            f"all_partitions() expects a TaskProxy returned by calling "
            f"a @task-decorated function inside a @job body, "
            f"got {type(proxy).__name__!r}."
        )
    return _AllPartitionsProxy(proxy.task_key)