Skip to content

Backfilling & Partitioning

Jobs that use @job(backfill=...) get a backfill key — a string job parameter that represents which data slice the run belongs to (similar to Airflow's logical_date or Dagster's partition key). The backfill CLI enumerates these keys for bulk run submission.

On the storage side, IoManagers can write data in a Hive-style partitioned layout (column=value/ directories) using the partition_by parameter on the @task decorator — see Built-in IoManagers for format-specific details.

Quick start

from databricks_bundle_decorators import job, task, DailyBackfill, get_backfill_key
from databricks_bundle_decorators.io_managers import PolarsParquetIoManager

io = PolarsParquetIoManager(
    base_path="abfss://lake@acct.dfs.core.windows.net/data",
)

@job(backfill=DailyBackfill(start_date="2024-01-01"))
def daily_pipeline():
    @task(io_manager=io, partition_by="backfill_key")
    def extract():
        key = get_backfill_key()
        return fetch_data(key)  # key is e.g. "2024-01-15"

    @task
    def transform(df):
        print(df.head())

    data = extract()
    transform(data)

All partition_by columns produce Hive-style partitioned output (column=value/ directory layout).

Partition-scoped overwrite

When partition_by is set, all built-in IoManagers automatically scope their writes to only the partitions present in the data being written. This means an overwrite write for region=eu will never touch region=us — existing partitions are preserved.

This is critical for backfill safety: running a backfill for 2024-01-15 will not destroy data for any other date.

The mechanism varies by format:

Format Mechanism
Delta (Polars & Spark) replaceWhere predicate scoped to written partition values
Parquet (Spark) partitionOverwriteMode=dynamic — only written partitions are replaced
Parquet / CSV / JSON (Polars) Hive-style PartitionBy directories — each partition is a separate directory, naturally isolated

Merge operations are partition-safe by design

When a task returns a DeltaMergeBuilder (PySpark) or DeltaMerge (Polars), the IoManager executes the merge directly — it bypasses the overwrite path entirely. The merge predicate (e.g. t.id = s.id) controls which rows are touched, so other partitions are never at risk. You can safely combine partition_by with merge; the replaceWhere / partitionOverwriteMode mechanisms above only apply to regular DataFrame writes, not merge operations.

See Delta Write Modes & Merge for full merge examples.

Without partition_by

When partition_by is not set, mode="overwrite" replaces the entire table/file as usual. Partition-scoped overwrite only applies when partition_by is configured on the @task.

Auto-filtering (default)

By default (auto_filter=True), all built-in IoManagers automatically push the distinct partition column values written by the producing task to downstream consumers via Databricks task values. Downstream reads are then filtered to exactly the partition values that were written — regardless of column name. This works for backfill_key, custom date columns, categorical columns, and multi-column partitioning alike.

The special column name "backfill_key" has one extra convenience on top of auto-filtering: the IoManager auto-injects a backfill_key column on write (your DataFrame doesn't need to contain it). Any other column name must already exist in the DataFrame.

Disabling auto-filtering

Pass auto_filter=False when constructing the IoManager to disable all automatic partition filtering. In this mode, a warning is logged for all partition columns reminding you to filter manually.

This is useful when the list of distinct partition values is large enough to exceed the 48 KB limit that Databricks imposes on task values passed between tasks. Disabling auto-filtering avoids that limit entirely — at the cost of requiring manual filtering in downstream task code.

io = PolarsParquetIoManager(
    base_path="...",
    auto_filter=False,   # no automatic filtering
)

Using an existing date column

If your dataset already has a date column that maps to the logical date, use partition_by with that column name directly. With the default auto_filter=True, downstream reads are automatically filtered to the written values — no manual filtering needed:

io = PolarsParquetIoManager(
    base_path="abfss://lake@acct.dfs.core.windows.net/data",
)

@job(backfill=DailyBackfill(start_date="2024-01-01"))
def daily_pipeline():
    @task(io_manager=io, partition_by="event_date")
    def extract() -> pl.LazyFrame:
        key = get_backfill_key()
        # The data already contains 'event_date' — no injection needed
        return pl.scan_ndjson(f"s3://raw/{key}/*.jsonl")

    @task
    def transform(df: pl.LazyFrame):
        # df is automatically filtered to the written event_date values
        ...

    data = extract()
    transform(data)

How it works

When @job(backfill=...) is specified, the decorator auto-injects a backfill_key job parameter with an empty default value. At runtime, a non-empty key is passed as a string to all IoManager contexts and is available via get_backfill_key().

Automatic key derivation

When the backfill_key parameter is missing or empty — for example when a job is triggered by a cron schedule, a file-arrival trigger, or a manual Run Now — and the job has a time-based backfill definition (DailyBackfill, WeeklyBackfill, MonthlyBackfill, or HourlyBackfill), the key is automatically derived from the current time in the backfill definition's timezone. A warning is logged when this happens.

Backfill type Auto-derived key
DailyBackfill Today's date (e.g. 2024-06-15)
WeeklyBackfill Current ISO week (e.g. 2024-W24)
MonthlyBackfill First of the current month (e.g. 2024-06-01)
HourlyBackfill Current hour (e.g. 2024-06-15T14)

When data_lag is configured, the auto-derived key is shifted backwards by that many periods (e.g. data_lag=1 on a DailyBackfill derives yesterday instead of today).

This means scheduled runs "just work" without requiring the trigger to supply the key explicitly.

StaticBackfill has no sensible default, so runs without an explicit key will still raise RuntimeError.

Reading the backfill key

Two helpers are available inside task functions:

  • get_backfill_key() — returns the raw key string (e.g. "2024-01-15", "us"). Works with all backfill types.
  • get_run_logical_date() — parses the key as an ISO-8601 datetime. Use for time-based backfills only.
from databricks_bundle_decorators import get_backfill_key, get_run_logical_date

key = get_backfill_key()          # "2024-01-15"
dt  = get_run_logical_date()      # datetime(2024, 1, 15, tzinfo=UTC)

For time-based backfill definitions, the key is auto-derived from the current time when not explicitly provided. A RuntimeError is raised only when no automatic derivation is possible (e.g. StaticBackfill, or no backfill definition at all). See the Backfill Definitions API for full parameter details.

Backfill definitions

Attach a BackfillDef to @job(backfill=...) to enable the backfill CLI:

from databricks_bundle_decorators import DailyBackfill

@job(backfill=DailyBackfill(start_date="2024-01-01"))
def my_pipeline():
    ...

The backfill definition only affects key enumeration — it does not change runtime behavior beyond injecting the backfill_key parameter.

Class Keys Example
DailyBackfill One per day (YYYY-MM-DD) 2024-01-012024-12-31
WeeklyBackfill One per ISO week (YYYY-WNN) 2024-W012024-W52
MonthlyBackfill One per month (YYYY-MM-01) 2024-01-012024-12-01
HourlyBackfill One per hour (YYYY-MM-DDTHH) 2024-01-01T002024-01-01T23
StaticBackfill Fixed list of strings ["us", "eu", "jp"]

All time-based definitions accept start_date, end_date (optional, defaults to "most recent complete period"), tz (IANA timezone), and data_lag (number of periods to subtract from the default end). Key formats are fixed to ISO-8601-compatible strings.

Multi-key backfill

Time-based backfill definitions support two additional parameters for producing multiple keys per run:

Parameter Purpose Applies during explicit backfill?
lookback Include N prior keys (rolling restatement) Yes
collect_schedule_gaps Include keys between previous cron fire and current key No

Use get_backfill_keys() (plural) at runtime to retrieve the full list of keys for the current run.

Rolling restatement (lookback)

When the data source delivers corrections for a rolling window on every delivery — e.g. a file on Wednesday contains corrected data for Monday and Tuesday plus new data for Wednesday:

from databricks_bundle_decorators import (
    DailyBackfill, get_backfill_keys, job, task,
)

@job(backfill=DailyBackfill(start_date="2024-01-01", lookback=2))
def corrections_pipeline():
    @task(io_manager=io, partition_by="backfill_key")
    def extract() -> pl.DataFrame:
        # Returns 3 keys: [T-2, T-1, T]
        keys = get_backfill_keys()
        frames = [fetch_and_tag(k) for k in keys]
        return pl.concat(frames)
    ...

lookback always applies — both for scheduled runs and for explicit dbxdec backfill --keys "2026-01-08" invocations — because the source's corrections exist regardless of how the run was triggered.

Schedule gap collection (collect_schedule_gaps)

When a job runs Mon–Fri only (cron excludes weekends) and the source has one file per calendar day, Monday's run needs to collect Saturday's and Sunday's files too:

from databricks.bundles.jobs import CronSchedule

@job(
    backfill=DailyBackfill(
        start_date="2024-01-01",
        collect_schedule_gaps=True,
    ),
    schedule=CronSchedule(
        quartz_cron_expression="0 0 6 ? * 2-6",  # MON-FRI 6am
        timezone_id="UTC",
    ),
)
def weekday_pipeline():
    @task(io_manager=io, partition_by="backfill_key")
    def extract() -> pl.DataFrame:
        keys = get_backfill_keys()  # Monday: [Sat, Sun, Mon]
        frames = [fetch_and_tag(k) for k in keys]
        return pl.concat(frames)
    ...

Schedule gap logic is stateless — determined entirely from the job's cron expression + the current key, with no run history needed. It is bypassed during explicit dbxdec backfill --keys invocations since targeting Saturday explicitly means you want only Saturday's data.

Combined usage

Both options can be used together. The result is the sorted union:

DailyBackfill(start_date="2024-01-01", lookback=3, collect_schedule_gaps=True)
Scenario Schedule gaps Lookback (3 prior) Union
Scheduled Monday Sat, Sun, Mon Fri, Sat, Sun, Mon Fri, Sat, Sun, Mon
Scheduled Wednesday Wed (no gap) Sun, Mon, Tue, Wed Sun, Mon, Tue, Wed
Explicit backfill Saturday (bypassed) Wed, Thu, Fri, Sat Wed, Thu, Fri, Sat

--exact flag

To bypass both lookback and schedule gap expansion (e.g. when corrections have already been applied by subsequent runs):

uv run dbxdec backfill my_pipeline --keys "2026-01-03" --exact

Auto-injection with multi-key

When partition_by="backfill_key" and multi-key backfill is active, the framework cannot auto-inject the backfill_key column (different rows belong to different keys). Your task must stamp each row with the correct key from get_backfill_keys():

@task(io_manager=io, partition_by="backfill_key")
def extract() -> pl.DataFrame:
    keys = get_backfill_keys()
    frames = [
        fetch_data(k).with_columns(pl.lit(k).alias("backfill_key"))
        for k in keys
    ]
    return pl.concat(frames)

If the DataFrame already contains a backfill_key column, auto-injection is suppressed. If it's absent and multiple keys are active, the framework raises ValueError rather than silently picking one.

Concurrency with overlapping partitions

With lookback > 0, adjacent scheduled runs write overlapping partitions. For merge mode this is safe (Delta optimistic concurrency). For overwrite mode this can cause data loss. Use merge mode or set max_concurrent_runs = 1 on the job when using lookback.

Timezone-aware defaults

All time-based definitions default to tz="UTC". The tz parameter determines which timezone is used to compute the default end_date ("yesterday", "last complete week/month"). Override it when your pipeline is tied to a specific region:

# "yesterday" in Berlin time
DailyBackfill(start_date="2024-01-01", tz="Europe/Berlin")

HourlyBackfill additionally uses tz to handle daylight-saving transitions safely.

StaticBackfill and --start/--end

StaticBackfill.keys() returns all keys regardless of start/end arguments. Using --start/--end with a static backfill definition in the backfill CLI has no effect.

Backfill CLI

The dbxdec backfill command submits one Databricks run per backfill key:

# Backfill all daily keys from start to yesterday
uv run dbxdec backfill my_pipeline --start 2024-01-01 --end 2024-03-31

# Dry run — show keys without submitting
uv run dbxdec backfill my_pipeline --dry-run

# Explicit keys (works even without a job-level backfill definition)
uv run dbxdec backfill my_pipeline --keys "2024-01-01,2024-01-02,2024-01-03"

# Submit most recent keys first
uv run dbxdec backfill my_pipeline --start 2024-01-01 --end 2024-03-31 --reverse

# Wait for all runs to complete and report success/failure
uv run dbxdec backfill my_pipeline --start 2024-01-01 --end 2024-01-07 --wait

Options

Flag Description
--start Start of range (inclusive)
--end End of range (inclusive)
--keys Comma-separated explicit keys
--exact Disable lookback and schedule-gap expansion (run exactly the specified keys)
--reverse Submit keys in descending order (most recent first)
--dry-run Print keys without submitting
--wait Wait for all runs to complete and exit non-zero on failure
--target, -t Databricks bundle target (e.g. dev, staging, prod)
--profile Databricks CLI profile name

Runs are submitted sequentially in key order. Databricks handles concurrency via the job's max_concurrent_runs setting and its built-in run queue.

Under the hood the command calls databricks bundle run, which automatically resolves the deployed job name — including any dev-mode prefix (e.g. [dev user] my_pipeline). The Databricks CLI must be installed and on your PATH.

When --wait is used, the CLI polls each run until completion, printing SUCCESS or the failure status for each key. This is useful in CI/CD pipelines where you need to gate on backfill success.

Catchup

The dbxdec catchup subcommand automatically determines which backfill keys are missing and submits only those. It is useful for:

  • First deploy — trigger all backfill keys after deploying a job for the first time.
  • Partial recovery — after a partial backfill, submit only the remaining keys.
# Preview missing keys without submitting
uv run dbxdec catchup my_pipeline --dry-run

# Submit all missing keys
uv run dbxdec catchup my_pipeline

The catchup command counts a key as already launched if it has a successful or still-active run. Terminally failed runs (FAILED, TIMED_OUT, CANCELED) will be relaunched if they exist.

Catchup requires a deployed bundle

catchup resolves the job ID from the bundle state via databricks bundle summary, so the bundle must have been deployed at least once before running catchup.

Cross-partition reads

By default, each downstream task reads only the current partition from its upstream dependencies. To read all partitions instead, use either approach:

Per-edge: all_partitions() wrapper

Wrap a single TaskProxy to mark that specific edge:

from databricks_bundle_decorators import job, task, all_partitions
from databricks_bundle_decorators.io_managers import PolarsParquetIoManager

io = PolarsParquetIoManager(
    base_path="abfss://lake@acct.dfs.core.windows.net/data",
)

@job(backfill=DailyBackfill(start_date="2024-01-01"))
def daily_pipeline():
    @task(io_manager=io, partition_by="backfill_key")
    def extract():
        ...

    @task
    def aggregate(data):
        # data contains ALL partitions
        return data.group_by("region").agg(pl.sum("revenue"))

    data = extract()
    aggregate(all_partitions(data))

Per-task: @task(all_partitions=True)

Mark the consuming task so that all upstream data dependencies read all partitions:

@task(all_partitions=True)
def aggregate(data):
    # every upstream IoManager read gets all partitions
    ...

Both approaches set context.all_partitions = True on the InputContext passed to IoManager.read(). See Custom IoManagers for how to handle this in your own implementations.

Limitations

  • No automatic scheduling. Backfill definitions describe the universe of valid keys but do not generate Databricks triggers or schedules. Use a Databricks cron trigger on the job and compute the current date in your task code, or use dbxdec backfill for ad-hoc runs.