Skip to content

Task Values

databricks_bundle_decorators.task_values.set_task_value(key, value)

Write a small value into Databricks task values.

Parameters:

Name Type Description Default
key str

Unique key for the value within this task.

required
value TaskValue

Any JSON-serializable value (str, int, float, bool, None, list, or dict).

required

Raises:

Type Description
TypeError

If value is not JSON-serializable.

Source code in src/databricks_bundle_decorators/task_values.py
def set_task_value(key: str, value: TaskValue) -> None:
    """Write a small value into Databricks task values.

    Parameters
    ----------
    key:
        Unique key for the value within this task.
    value:
        Any JSON-serializable value (``str``, ``int``, ``float``, ``bool``,
        ``None``, ``list``, or ``dict``).

    Raises
    ------
    TypeError
        If *value* is not JSON-serializable.
    """
    try:
        json.dumps(value)
    except (TypeError, ValueError) as exc:
        raise TypeError(
            f"TaskValues must be JSON-serializable, "
            f"got {type(value).__name__}: {exc}. "
            f"Use an IoManager for complex data."
        ) from exc

    if _is_databricks_runtime():
        # On Databricks: let any API/permission error propagate so that
        # real runtime failures are visible rather than silently swallowed.
        from pyspark.dbutils import DBUtils  # type: ignore[import-not-found]  # Databricks-only
        from pyspark.sql import SparkSession

        spark = SparkSession.builder.getOrCreate()
        dbutils = DBUtils(spark)
        dbutils.jobs.taskValues.set(key=key, value=value)
    else:
        # Local / testing fallback — use the current task key if set by
        # the runtime runner, otherwise fall back to "__current__".
        store_key = _current_task_key or "__current__"
        _local_task_values.setdefault(store_key, {})[key] = value

databricks_bundle_decorators.task_values.get_task_value(task_key, key)

Read a value previously written by an upstream task.

Parameters:

Name Type Description Default
task_key str

The task_key of the upstream task that called set_task_value.

required
key str

The key passed to set_task_value.

required
Source code in src/databricks_bundle_decorators/task_values.py
def get_task_value(task_key: str, key: str) -> Any:
    """Read a value previously written by an upstream task.

    Parameters
    ----------
    task_key:
        The ``task_key`` of the upstream task that called `set_task_value`.
    key:
        The key passed to `set_task_value`.
    """
    if _is_databricks_runtime():
        # On Databricks: let any API/permission error propagate.
        from pyspark.dbutils import DBUtils  # type: ignore[import-not-found]  # Databricks-only
        from pyspark.sql import SparkSession

        spark = SparkSession.builder.getOrCreate()
        dbutils = DBUtils(spark)
        return dbutils.jobs.taskValues.get(taskKey=task_key, key=key)
    else:
        return _local_task_values.get(task_key, {}).get(key)