Polars¶
Partitioning¶
All Polars IoManagers support Hive-style partitioning via the
partition_by parameter on the @task decorator. When partition_by
includes "backfill_key", the column is auto-injected before writing
and auto-filtered on read.
io = PolarsParquetIoManager(
base_path="abfss://lake@acct.dfs.core.windows.net/data",
)
@task(io_manager=io, partition_by=["backfill_key", "region"])
def extract(): ...
Parquet, CSV, and NDJSON use pl.PartitionBy for LazyFrame sinks.
DataFrame writes use native partition_by (Parquet) or
.lazy().sink_* with PartitionBy (CSV/NDJSON).
Delta uses delta_write_options={"partition_by": ...}.
Reads use hive_partitioning=True (Parquet, CSV, NDJSON) or
Delta's native partition pruning.
Parquet¶
databricks_bundle_decorators.io_managers.PolarsParquetIoManager(base_path, storage_options=None, write_options=None, read_options=None, *, auto_filter=True, retry=None)
¶
Bases: IoManager
Persist Polars DataFrames as Parquet on any cloud or local filesystem.
Automatically dispatches based on return-value type:
polars.DataFrame→write_parquet/read_parquetpolars.LazyFrame→sink_parquet/scan_parquet
On the read side, the downstream task's parameter type annotation
determines the method used. Annotate the parameter as
pl.DataFrame to receive an eager read; otherwise (including
unannotated parameters) a lazy scan_parquet is used by default.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_path
|
str | Callable[[], str]
|
Root URI for Parquet files. Can be a local path ( Can also be a callable that returns a string, resolved lazily at runtime. Use this for multi-environment deployments where the path depends on job parameters:: |
required |
storage_options
|
dict[str, str] | Callable[[], dict[str, str]] | None
|
Credentials / options forwarded to Polars I/O calls.
Can be a plain dict, a callable that returns a dict (resolved
lazily on each read/write), or Use a callable to defer credential lookup to runtime — this is
essential when credentials come from A plain dict also works when credentials are known statically:: |
None
|
write_options
|
dict[str, Any] | None
|
Extra keyword arguments forwarded to the Polars write call
( Do not include |
None
|
read_options
|
dict[str, Any] | None
|
Extra keyword arguments forwarded to the Polars read call
( |
None
|
retry
|
`RetryConfig` | None
|
Optional retry configuration for write operations. When set,
failed writes are retried with exponential backoff (powered by
|
None
|
Example
::
from databricks_bundle_decorators.io_managers import PolarsParquetIoManager
io = PolarsParquetIoManager(
base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
storage_options={"account_name": "myaccount", "account_key": "***"},
)
@task(io_manager=io)
def extract() -> pl.LazyFrame: # sink_parquet on write
return pl.LazyFrame({"a": [1, 2]})
@task
def transform(df: pl.LazyFrame): # scan_parquet on read
print(df.collect())
Source code in src/databricks_bundle_decorators/io_managers/polars_parquet.py
base_path
property
¶
Resolve base_path, calling it first if it is a callable.
storage_options
property
¶
Resolve storage_options, calling it first if it is a callable.
write(context, obj)
¶
Write a Polars DataFrame or LazyFrame to Parquet.
polars.DataFrame→write_parquet(nativepartition_by)polars.LazyFrame→sink_parquet(pl.PartitionBy)
When partition_by is set on the @task decorator, writes
to Hive-style partitioned directories.
Source code in src/databricks_bundle_decorators/io_managers/polars_parquet.py
read(context)
¶
Read Parquet as a LazyFrame or DataFrame.
If the downstream parameter is annotated as polars.DataFrame,
returns read_parquet (eager). Otherwise returns scan_parquet
(lazy polars.LazyFrame) — this is the default for
unannotated parameters.
When partition_by is set on the producing @task, reads
from the Hive-partitioned directory. By default only the
current backfill_key partition is returned; use
all_partitions() on the upstream dependency or
@task(all_partitions=True) on the consuming task to read
all partitions.
Source code in src/databricks_bundle_decorators/io_managers/polars_parquet.py
Delta¶
Merge / Upsert
mode="merge" is not a valid write mode and will raise a ValueError.
To perform merge/upsert operations, return a DeltaMerge from your
task function.
See Delta Write Modes & Merge for
full examples.
databricks_bundle_decorators.io_managers.PolarsDeltaIoManager(base_path, storage_options=None, write_options=None, read_options=None, mode='error', *, auto_filter=True, retry=None)
¶
Bases: IoManager
Persist Polars DataFrames as Delta tables on any cloud or local filesystem.
Write dispatch:
polars.DataFrame→write_deltapolars.LazyFrame→sink_deltaDeltaMerge→ merge/upsert operation
On the read side, the downstream task's parameter type annotation
determines the method used. Annotate the parameter as
pl.DataFrame to receive an eager read_delta; otherwise
(including unannotated parameters) a lazy scan_delta is used
by default.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_path
|
str | Callable[[], str]
|
Root URI for Delta tables. Each task creates a sub-directory
named after its task key. Can be a local path, an Azure URI
( Can also be a callable that returns a string, resolved lazily at runtime. Use this for multi-environment deployments where the path depends on job parameters:: |
required |
storage_options
|
dict[str, str] | Callable[[], dict[str, str]] | None
|
Credentials / options forwarded to Polars and .. note::
Use a callable to defer credential lookup to runtime:: |
None
|
write_options
|
dict[str, Any] | None
|
Extra keyword arguments forwarded to the Polars write call
( Do not include |
None
|
mode
|
str
|
Delta write mode. One of For merge operations, return a |
'error'
|
read_options
|
dict[str, Any] | None
|
Extra keyword arguments forwarded to the Polars read call
( |
None
|
retry
|
`RetryConfig` | None
|
Optional retry configuration for write operations. When set,
failed writes are retried with exponential backoff (powered by
|
None
|
Example
::
from databricks_bundle_decorators.io_managers import PolarsDeltaIoManager
io = PolarsDeltaIoManager(
base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
)
@task(io_manager=io)
def extract() -> pl.DataFrame:
return pl.DataFrame({"a": [1, 2]})
@task
def transform(df: pl.LazyFrame): # scan_delta on read
print(df.collect())
Source code in src/databricks_bundle_decorators/io_managers/polars_delta.py
base_path
property
¶
Resolve base_path, calling it first if it is a callable.
storage_options
property
¶
Resolve storage_options, calling it first if it is a callable.
write(context, obj)
¶
Write a Polars DataFrame, LazyFrame, or DeltaMerge.
polars.DataFrame→write_deltapolars.LazyFrame→sink_deltaDeltaMerge→ merge/upsert operation
When partition_by is set on the @task decorator, writes
with delta_write_options={"partition_by": ...}.
Source code in src/databricks_bundle_decorators/io_managers/polars_delta.py
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 | |
write_with_retry(context, obj)
¶
Write with retry logic.
DeltaMerge and DataFrame/LazyFrame writes are all retried
when RetryConfig is configured.
Source code in src/databricks_bundle_decorators/io_managers/polars_delta.py
read(context)
¶
Read a Delta table as a LazyFrame or DataFrame.
If the downstream parameter is annotated as polars.DataFrame,
returns read_delta (eager). Otherwise returns scan_delta
(lazy polars.LazyFrame) — this is the default for
unannotated parameters.
When partition_by includes "backfill_key", reads are
filtered to the current partition unless the upstream
dependency uses all_partitions() or the consuming
task uses @task(all_partitions=True).
Source code in src/databricks_bundle_decorators/io_managers/polars_delta.py
JSON (NDJSON)¶
databricks_bundle_decorators.io_managers.PolarsJsonIoManager(base_path, storage_options=None, write_options=None, read_options=None, *, auto_filter=True, retry=None)
¶
Bases: IoManager
Persist Polars DataFrames as NDJSON on any cloud or local filesystem.
Uses newline-delimited JSON (NDJSON) format, the standard for
streaming data pipelines. This is the only JSON variant in Polars
that supports cloud storage (storage_options) and lazy I/O.
Write dispatch:
polars.LazyFrame→sink_ndjsonpolars.DataFrame→.lazy()thensink_ndjson(routed through the lazy path for cloud storage support)
On the read side, the downstream task's parameter type annotation
determines the method used. Annotate the parameter as
pl.DataFrame to receive an eager read_ndjson; otherwise
(including unannotated parameters) a lazy scan_ndjson is used
by default.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_path
|
str | Callable[[], str]
|
Root URI for NDJSON files. Can be a local path ( Can also be a callable that returns a string, resolved lazily at runtime. Use this for multi-environment deployments where the path depends on job parameters:: |
required |
storage_options
|
dict[str, str] | Callable[[], dict[str, str]] | None
|
Credentials / options forwarded to Polars I/O calls.
Can be a plain dict, a callable that returns a dict (resolved
lazily on each read/write), or Use a callable to defer credential lookup to runtime:: |
None
|
write_options
|
dict[str, Any] | None
|
Extra keyword arguments forwarded to the Polars write call
( Do not include |
None
|
read_options
|
dict[str, Any] | None
|
Extra keyword arguments forwarded to the Polars read call
( |
None
|
retry
|
`RetryConfig` | None
|
Optional retry configuration for write operations. When set,
failed writes are retried with exponential backoff (powered by
|
None
|
Example
::
from databricks_bundle_decorators.io_managers import PolarsJsonIoManager
io = PolarsJsonIoManager(
base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
)
@task(io_manager=io)
def extract() -> pl.LazyFrame: # sink_ndjson on write
return pl.LazyFrame({"a": [1, 2]})
@task
def transform(df: pl.LazyFrame): # scan_ndjson on read
print(df.collect())
Source code in src/databricks_bundle_decorators/io_managers/polars_json.py
base_path
property
¶
Resolve base_path, calling it first if it is a callable.
storage_options
property
¶
Resolve storage_options, calling it first if it is a callable.
write(context, obj)
¶
Write a Polars DataFrame or LazyFrame as NDJSON.
polars.LazyFrame→sink_ndjsonpolars.DataFrame→.lazy()thensink_ndjson
When partition_by is set on the @task decorator, writes
to Hive-style partitioned directories using
pl.PartitionBy.
Source code in src/databricks_bundle_decorators/io_managers/polars_json.py
read(context)
¶
Read NDJSON as a LazyFrame or DataFrame.
If the downstream parameter is annotated as polars.DataFrame,
returns read_ndjson (eager). Otherwise returns scan_ndjson
(lazy polars.LazyFrame) — this is the default for
unannotated parameters.
When partition_by is set, reads from the Hive-partitioned
directory. By default only the current backfill_key
partition is returned; use all_partitions() on the
upstream dependency or @task(all_partitions=True) on
the consuming task to read all partitions.
Source code in src/databricks_bundle_decorators/io_managers/polars_json.py
CSV¶
databricks_bundle_decorators.io_managers.PolarsCsvIoManager(base_path, storage_options=None, write_options=None, read_options=None, *, auto_filter=True, retry=None)
¶
Bases: IoManager
Persist Polars DataFrames as CSV on any cloud or local filesystem.
Write dispatch:
polars.LazyFrame→sink_csvpolars.DataFrame→write_csv
On the read side, the downstream task's parameter type annotation
determines the method used. Annotate the parameter as
pl.DataFrame to receive an eager read_csv; otherwise
(including unannotated parameters) a lazy scan_csv is used
by default.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_path
|
str | Callable[[], str]
|
Root URI for CSV files. Can be a local path ( Can also be a callable that returns a string, resolved lazily at runtime. Use this for multi-environment deployments where the path depends on job parameters:: |
required |
storage_options
|
dict[str, str] | Callable[[], dict[str, str]] | None
|
Credentials / options forwarded to Polars I/O calls.
Can be a plain dict, a callable that returns a dict (resolved
lazily on each read/write), or Use a callable to defer credential lookup to runtime:: |
None
|
write_options
|
dict[str, Any] | None
|
Extra keyword arguments forwarded to the Polars write call
( Do not include |
None
|
read_options
|
dict[str, Any] | None
|
Extra keyword arguments forwarded to the Polars read call
( |
None
|
retry
|
`RetryConfig` | None
|
Optional retry configuration for write operations. When set,
failed writes are retried with exponential backoff (powered by
|
None
|
Example
::
from databricks_bundle_decorators.io_managers import PolarsCsvIoManager
io = PolarsCsvIoManager(
base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
)
@task(io_manager=io)
def extract() -> pl.LazyFrame: # sink_csv on write
return pl.LazyFrame({"a": [1, 2]})
@task
def transform(df: pl.LazyFrame): # scan_csv on read
print(df.collect())
Source code in src/databricks_bundle_decorators/io_managers/polars_csv.py
base_path
property
¶
Resolve base_path, calling it first if it is a callable.
storage_options
property
¶
Resolve storage_options, calling it first if it is a callable.
write(context, obj)
¶
Write a Polars DataFrame or LazyFrame as CSV.
polars.LazyFrame→sink_csvpolars.DataFrame→write_csv(single file) or.lazy().sink_csv(PartitionBy)(partitioned)
When partition_by is set on the @task decorator, writes
to Hive-style partitioned directories using
pl.PartitionBy.
Source code in src/databricks_bundle_decorators/io_managers/polars_csv.py
read(context)
¶
Read CSV as a LazyFrame or DataFrame.
If the downstream parameter is annotated as polars.DataFrame,
returns read_csv (eager). Otherwise returns scan_csv
(lazy polars.LazyFrame) — this is the default for
unannotated parameters.
When partition_by is set, reads from the Hive-partitioned
directory. By default only the current backfill_key
partition is returned; use all_partitions() on the
upstream dependency or @task(all_partitions=True) on
the consuming task to read all partitions.