Polars¶
Partitioning¶
All Polars IoManagers support Hive-style partitioning via the
partition_by parameter on the @task decorator. When partition_by
includes "backfill_key", the column is auto-injected before writing
and auto-filtered on read.
io = PolarsParquetIoManager(
base_path="abfss://lake@acct.dfs.core.windows.net/data",
)
@task(io_manager=io, partition_by=["backfill_key", "region"])
def extract(): ...
Parquet, CSV, and NDJSON use pl.PartitionBy for LazyFrame sinks.
DataFrame writes use native partition_by (Parquet) or
.lazy().sink_* with PartitionBy (CSV/NDJSON).
Delta uses delta_write_options={"partition_by": ...}.
Reads use hive_partitioning=True (Parquet, CSV, NDJSON) or
Delta's native partition pruning.
Parquet¶
databricks_bundle_decorators.io_managers.PolarsParquetIoManager(base_path, storage_options=None, write_options=None, read_options=None, *, auto_filter=True)
¶
Bases: IoManager
Persist Polars DataFrames as Parquet on any cloud or local filesystem.
Automatically dispatches based on return-value type:
polars.DataFrame→write_parquet/read_parquetpolars.LazyFrame→sink_parquet/scan_parquet
On the read side, the downstream task's parameter type annotation
determines the method used. Annotate the parameter as
pl.DataFrame to receive an eager read; otherwise (including
unannotated parameters) a lazy scan_parquet is used by default.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_path
|
str | Callable[[], str]
|
Root URI for Parquet files. Can be a local path ( Can also be a callable that returns a string, resolved lazily at runtime. Use this for multi-environment deployments where the path depends on job parameters:: |
required |
storage_options
|
dict[str, str] | Callable[[], dict[str, str]] | None
|
Credentials / options forwarded to Polars I/O calls.
Can be a plain dict, a callable that returns a dict (resolved
lazily on each read/write), or Use a callable to defer credential lookup to runtime — this is
essential when credentials come from A plain dict also works when credentials are known statically:: |
None
|
write_options
|
dict[str, Any] | None
|
Extra keyword arguments forwarded to the Polars write call
( Do not include |
None
|
read_options
|
dict[str, Any] | None
|
Extra keyword arguments forwarded to the Polars read call
( |
None
|
Example
::
from databricks_bundle_decorators.io_managers import PolarsParquetIoManager
io = PolarsParquetIoManager(
base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
storage_options={"account_name": "myaccount", "account_key": "***"},
)
@task(io_manager=io)
def extract() -> pl.LazyFrame: # sink_parquet on write
return pl.LazyFrame({"a": [1, 2]})
@task
def transform(df: pl.LazyFrame): # scan_parquet on read
print(df.collect())
Source code in src/databricks_bundle_decorators/io_managers/polars_parquet.py
base_path
property
¶
Resolve base_path, calling it first if it is a callable.
storage_options
property
¶
Resolve storage_options, calling it first if it is a callable.
write(context, obj)
¶
Write a Polars DataFrame or LazyFrame to Parquet.
polars.DataFrame→write_parquet(nativepartition_by)polars.LazyFrame→sink_parquet(pl.PartitionBy)
When partition_by is set on the @task decorator, writes
to Hive-style partitioned directories.
Source code in src/databricks_bundle_decorators/io_managers/polars_parquet.py
read(context)
¶
Read Parquet as a LazyFrame or DataFrame.
If the downstream parameter is annotated as polars.DataFrame,
returns read_parquet (eager). Otherwise returns scan_parquet
(lazy polars.LazyFrame) — this is the default for
unannotated parameters.
When partition_by is set on the producing @task, reads
from the Hive-partitioned directory. By default only the
current backfill_key partition is returned; use
all_partitions() on the upstream dependency or
@task(all_partitions=True) on the consuming task to read
all partitions.
Source code in src/databricks_bundle_decorators/io_managers/polars_parquet.py
Delta¶
databricks_bundle_decorators.io_managers.PolarsDeltaIoManager(base_path, storage_options=None, write_options=None, read_options=None, mode='error', *, auto_filter=True)
¶
Bases: IoManager
Persist Polars DataFrames as Delta tables on any cloud or local filesystem.
Write dispatch:
polars.DataFrame→write_deltapolars.LazyFrame→sink_deltadeltalake.table.TableMerger→.execute()(for merge operations with predicate / action chaining)
On the read side, the downstream task's parameter type annotation
determines the method used. Annotate the parameter as
pl.DataFrame to receive an eager read_delta; otherwise
(including unannotated parameters) a lazy scan_delta is used
by default.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_path
|
str | Callable[[], str]
|
Root URI for Delta tables. Each task creates a sub-directory
named after its task key. Can be a local path, an Azure URI
( Can also be a callable that returns a string, resolved lazily at runtime. Use this for multi-environment deployments where the path depends on job parameters:: |
required |
storage_options
|
dict[str, str] | Callable[[], dict[str, str]] | None
|
Credentials / options forwarded to Polars and .. note::
Use a callable to defer credential lookup to runtime:: |
None
|
write_options
|
dict[str, Any] | None
|
Extra keyword arguments forwarded to the Polars write call
( Do not include |
None
|
mode
|
str
|
Delta write mode. One of For merge operations, ignore this parameter and return a
fully-configured |
'error'
|
read_options
|
dict[str, Any] | None
|
Extra keyword arguments forwarded to the Polars read call
( |
None
|
Example
::
from databricks_bundle_decorators.io_managers import PolarsDeltaIoManager
io = PolarsDeltaIoManager(
base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
)
@task(io_manager=io)
def extract() -> pl.DataFrame:
return pl.DataFrame({"a": [1, 2]})
@task
def transform(df: pl.LazyFrame): # scan_delta on read
print(df.collect())
Source code in src/databricks_bundle_decorators/io_managers/polars_delta.py
base_path
property
¶
Resolve base_path, calling it first if it is a callable.
storage_options
property
¶
Resolve storage_options, calling it first if it is a callable.
write(context, obj)
¶
Write a Polars DataFrame, LazyFrame, or TableMerger.
polars.DataFrame→write_deltapolars.LazyFrame→sink_deltadeltalake.table.TableMerger→.execute()
When partition_by is set on the @task decorator, writes
with delta_write_options={"partition_by": ...}.
Source code in src/databricks_bundle_decorators/io_managers/polars_delta.py
read(context)
¶
Read a Delta table as a LazyFrame or DataFrame.
If the downstream parameter is annotated as polars.DataFrame,
returns read_delta (eager). Otherwise returns scan_delta
(lazy polars.LazyFrame) — this is the default for
unannotated parameters.
When partition_by includes "backfill_key", reads are
filtered to the current partition unless the upstream
dependency uses all_partitions() or the consuming
task uses @task(all_partitions=True).
Source code in src/databricks_bundle_decorators/io_managers/polars_delta.py
JSON (NDJSON)¶
databricks_bundle_decorators.io_managers.PolarsJsonIoManager(base_path, storage_options=None, write_options=None, read_options=None, *, auto_filter=True)
¶
Bases: IoManager
Persist Polars DataFrames as NDJSON on any cloud or local filesystem.
Uses newline-delimited JSON (NDJSON) format, the standard for
streaming data pipelines. This is the only JSON variant in Polars
that supports cloud storage (storage_options) and lazy I/O.
Write dispatch:
polars.LazyFrame→sink_ndjsonpolars.DataFrame→.lazy()thensink_ndjson(routed through the lazy path for cloud storage support)
On the read side, the downstream task's parameter type annotation
determines the method used. Annotate the parameter as
pl.DataFrame to receive an eager read_ndjson; otherwise
(including unannotated parameters) a lazy scan_ndjson is used
by default.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_path
|
str | Callable[[], str]
|
Root URI for NDJSON files. Can be a local path ( Can also be a callable that returns a string, resolved lazily at runtime. Use this for multi-environment deployments where the path depends on job parameters:: |
required |
storage_options
|
dict[str, str] | Callable[[], dict[str, str]] | None
|
Credentials / options forwarded to Polars I/O calls.
Can be a plain dict, a callable that returns a dict (resolved
lazily on each read/write), or Use a callable to defer credential lookup to runtime:: |
None
|
write_options
|
dict[str, Any] | None
|
Extra keyword arguments forwarded to the Polars write call
( Do not include |
None
|
read_options
|
dict[str, Any] | None
|
Extra keyword arguments forwarded to the Polars read call
( |
None
|
Example
::
from databricks_bundle_decorators.io_managers import PolarsJsonIoManager
io = PolarsJsonIoManager(
base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
)
@task(io_manager=io)
def extract() -> pl.LazyFrame: # sink_ndjson on write
return pl.LazyFrame({"a": [1, 2]})
@task
def transform(df: pl.LazyFrame): # scan_ndjson on read
print(df.collect())
Source code in src/databricks_bundle_decorators/io_managers/polars_json.py
base_path
property
¶
Resolve base_path, calling it first if it is a callable.
storage_options
property
¶
Resolve storage_options, calling it first if it is a callable.
write(context, obj)
¶
Write a Polars DataFrame or LazyFrame as NDJSON.
polars.LazyFrame→sink_ndjsonpolars.DataFrame→.lazy()thensink_ndjson
When partition_by is set on the @task decorator, writes
to Hive-style partitioned directories using
pl.PartitionBy.
Source code in src/databricks_bundle_decorators/io_managers/polars_json.py
read(context)
¶
Read NDJSON as a LazyFrame or DataFrame.
If the downstream parameter is annotated as polars.DataFrame,
returns read_ndjson (eager). Otherwise returns scan_ndjson
(lazy polars.LazyFrame) — this is the default for
unannotated parameters.
When partition_by is set, reads from the Hive-partitioned
directory. By default only the current backfill_key
partition is returned; use all_partitions() on the
upstream dependency or @task(all_partitions=True) on
the consuming task to read all partitions.
Source code in src/databricks_bundle_decorators/io_managers/polars_json.py
CSV¶
databricks_bundle_decorators.io_managers.PolarsCsvIoManager(base_path, storage_options=None, write_options=None, read_options=None, *, auto_filter=True)
¶
Bases: IoManager
Persist Polars DataFrames as CSV on any cloud or local filesystem.
Write dispatch:
polars.LazyFrame→sink_csvpolars.DataFrame→write_csv
On the read side, the downstream task's parameter type annotation
determines the method used. Annotate the parameter as
pl.DataFrame to receive an eager read_csv; otherwise
(including unannotated parameters) a lazy scan_csv is used
by default.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_path
|
str | Callable[[], str]
|
Root URI for CSV files. Can be a local path ( Can also be a callable that returns a string, resolved lazily at runtime. Use this for multi-environment deployments where the path depends on job parameters:: |
required |
storage_options
|
dict[str, str] | Callable[[], dict[str, str]] | None
|
Credentials / options forwarded to Polars I/O calls.
Can be a plain dict, a callable that returns a dict (resolved
lazily on each read/write), or Use a callable to defer credential lookup to runtime:: |
None
|
write_options
|
dict[str, Any] | None
|
Extra keyword arguments forwarded to the Polars write call
( Do not include |
None
|
read_options
|
dict[str, Any] | None
|
Extra keyword arguments forwarded to the Polars read call
( |
None
|
Example
::
from databricks_bundle_decorators.io_managers import PolarsCsvIoManager
io = PolarsCsvIoManager(
base_path="abfss://lake@myaccount.dfs.core.windows.net/staging",
)
@task(io_manager=io)
def extract() -> pl.LazyFrame: # sink_csv on write
return pl.LazyFrame({"a": [1, 2]})
@task
def transform(df: pl.LazyFrame): # scan_csv on read
print(df.collect())
Source code in src/databricks_bundle_decorators/io_managers/polars_csv.py
base_path
property
¶
Resolve base_path, calling it first if it is a callable.
storage_options
property
¶
Resolve storage_options, calling it first if it is a callable.
write(context, obj)
¶
Write a Polars DataFrame or LazyFrame as CSV.
polars.LazyFrame→sink_csvpolars.DataFrame→write_csv(single file) or.lazy().sink_csv(PartitionBy)(partitioned)
When partition_by is set on the @task decorator, writes
to Hive-style partitioned directories using
pl.PartitionBy.
Source code in src/databricks_bundle_decorators/io_managers/polars_csv.py
read(context)
¶
Read CSV as a LazyFrame or DataFrame.
If the downstream parameter is annotated as polars.DataFrame,
returns read_csv (eager). Otherwise returns scan_csv
(lazy polars.LazyFrame) — this is the default for
unannotated parameters.
When partition_by is set, reads from the Hive-partitioned
directory. By default only the current backfill_key
partition is returned; use all_partitions() on the
upstream dependency or @task(all_partitions=True) on
the consuming task to read all partitions.