Ask AI

You are viewing an unreleased or outdated version of the documentation

Source code for dagster._core.definitions.freshness_checks.time_partition

from typing import Sequence, Union

from dagster import _check as check
from dagster._annotations import experimental
from dagster._core.definitions.asset_check_spec import AssetCheckSeverity
from dagster._core.definitions.time_window_partitions import TimeWindowPartitionsDefinition

from ..asset_checks import AssetChecksDefinition
from ..assets import AssetsDefinition, SourceAsset
from ..events import CoercibleToAssetKey
from .utils import (
    DEFAULT_FRESHNESS_SEVERITY,
    DEFAULT_FRESHNESS_TIMEZONE,
    build_freshness_checks_for_assets,
)


[docs]@experimental def build_time_partition_freshness_checks( *, assets: Sequence[Union[SourceAsset, CoercibleToAssetKey, AssetsDefinition]], deadline_cron: str, timezone: str = DEFAULT_FRESHNESS_TIMEZONE, severity: AssetCheckSeverity = DEFAULT_FRESHNESS_SEVERITY, ) -> Sequence[AssetChecksDefinition]: r"""For each provided time-window partitioned asset, constructs a freshness check definition. This check passes if the asset is considered "fresh" by the time that execution begins. We consider an asset to be "fresh" if there exists a record for the most recent partition, once the deadline has passed. `deadline_cron` is a cron schedule that defines the deadline for when we should expect the most recent partition to arrive by. Once a tick of the cron schedule has passed, this check will fail if the most recent partition has not been observed/materialized. Let's say I have a daily-partitioned asset which runs every day at 8:00 AM UTC, and takes around 45 minutes to complete. To account for operational delays, I would expect the asset to be done materializing every day by 9:00 AM UTC. I would set the `deadline_cron` to "0 9 \* \* \*". This means that starting at 9:00 AM, this check will expect a record to exist for the previous day's partition. Note that if the check runs at 8:59 AM, the deadline has not yet passed, and we'll instead be checking for the most recently passed deadline, which is yesterday (meaning the partition representing the day before yesterday). The timestamp of an observation record is the timestamp indicated by the "dagster/last_updated_timestamp" metadata key. The timestamp of a materialization record is the timestamp at which that record was created. The check will fail at runtime if a non-time-window partitioned asset is passed in. Examples: .. code-block:: python from dagster import build_time_partition_freshness_checks, AssetKey # A daily partitioned asset that is expected to be updated every day within 45 minutes # of 9:00 AM UTC from .somewhere import my_daily_scheduled_assets_def checks = build_time_partition_freshness_checks( [my_daily_scheduled_assets_def], deadline_cron="0 9 * * *", ) Args: assets (Sequence[Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset]): The assets to construct checks for. For each passed in asset, there will be a corresponding constructed `AssetChecksDefinition`. deadline_cron (str): The check will pass if the partition time window most recently completed by the time of the last cron tick has been observed/materialized. timezone (Optional[str]): The timezone to use when calculating freshness and deadline. If not provided, defaults to "UTC". Returns: Sequence[AssetChecksDefinition]: A list of `AssetChecksDefinition` objects, each corresponding to an asset in the `assets` parameter. """ return build_freshness_checks_for_assets( assets=assets, deadline_cron=deadline_cron, timezone=timezone, severity=severity, asset_property_enforcement_lambda=lambda assets_def: check.invariant( isinstance(assets_def.partitions_def, TimeWindowPartitionsDefinition), "Asset is not time-window partitioned.", ), )