Ask AI

You are viewing an unreleased or outdated version of the documentation

Source code for dagster._core.definitions.freshness_checks.last_update

import datetime
from typing import Optional, Sequence, Union

from dagster._annotations import experimental
from dagster._core.definitions.asset_check_spec import AssetCheckSeverity

from ..asset_checks import AssetChecksDefinition
from ..assets import AssetsDefinition, SourceAsset
from ..events import CoercibleToAssetKey
from .utils import (
    DEFAULT_FRESHNESS_SEVERITY,
    DEFAULT_FRESHNESS_TIMEZONE,
    build_freshness_checks_for_assets,
)


[docs]@experimental def build_last_update_freshness_checks( *, assets: Sequence[Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset]], lower_bound_delta: datetime.timedelta, deadline_cron: Optional[str] = None, timezone: str = DEFAULT_FRESHNESS_TIMEZONE, severity: AssetCheckSeverity = DEFAULT_FRESHNESS_SEVERITY, ) -> Sequence[AssetChecksDefinition]: r"""For each provided asset, constructs a freshness check definition. This check passes if the asset is found to be "fresh", and fails if the asset is found to be "overdue". An asset is considered fresh if a record (i.e. a materialization or observation) exists with a timestamp greater than the "lower bound" derived from the parameters of this function. `deadline_cron` is a cron schedule that defines the deadline for when we should expect the asset to arrive by; if not provided, we consider the deadline to be the execution time of the check. `lower_bound_delta` is a timedelta that defines the lower bound for when a record could have arrived by. If the most recent recent record's timestamp is earlier than `deadline-lower_bound_delta`, the asset is considered overdue. Let's use two examples, one with a deadline_cron set and one without. Let's say I have an asset which runs on a schedule every day at 8:00 AM UTC, and usually takes around 45 minutes to complete. To account for operational delays, I would expect the asset to be done materializing every day by 9:00 AM UTC. I would set the `deadline_cron` to "0 9 \* \* \*", and the `lower_bound_delta` to "45 minutes". This would mean that starting at 9:00 AM, this check will expect a materialization record to have been created no earlier than 8:15 AM. Note that if the check runs at 8:59 AM, the deadline has not yet passed, and we'll instead be checking for the most recently passed deadline, which is yesterday. Let's say I have an observable source asset on a data source which I expect should never be more than 3 hours out of date. In this case, there's no fixed schedule for when the data should be updated, so I would not provide a `deadline_cron`. Instead, I would set the `lower_bound_delta` parameter to "3 hours". This would mean that the check will expect the most recent observation record to indicate data no older than 3 hours, relative to the current time, regardless of when it runs. Examples: .. code-block:: python # Example 1: Assets that are expected to be updated every day within 45 minutes of # 9:00 AM UTC from dagster import build_last_update_freshness_checks, AssetKey from .somewhere import my_daily_scheduled_assets_def checks = build_last_update_freshness_checks( [my_daily_scheduled_assets_def, AssetKey("my_other_daily_asset_key")], lower_bound_delta=datetime.timedelta(minutes=45), deadline_cron="0 9 * * *", ) # Example 2: Assets that are expected to be updated within 3 hours of the current time from dagster import build_last_update_freshness_checks, AssetKey from .somewhere import my_observable_source_asset checks = build_last_update_freshness_checks( [my_observable_source_asset, AssetKey("my_other_observable_asset_key")], lower_bound_delta=datetime.timedelta(hours=3), ) Args: assets (Sequence[Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset]): The assets to construct checks for. For each individual asset (of which there can be multiple per `AssetsDefinition`, there will be a corresponding constructed `AssetChecksDefinition`. lower_bound_delta (datetime.timedelta): The check will pass if the asset was updated within lower_bound_delta of the current_time (no cron) or the most recent tick of the cron (cron provided). deadline_cron (Optional[str]): Defines the deadline for when we should start checking that the asset arrived. If not provided, the deadline is the execution time of the check. timezone (Optional[str]): The timezone to use when calculating freshness and deadline. If not provided, defaults to "UTC". Returns: Sequence[AssetChecksDefinition]: A list of `AssetChecksDefinition` objects, each corresponding to an asset in the `assets` parameter. """ return build_freshness_checks_for_assets( assets=assets, deadline_cron=deadline_cron, timezone=timezone, severity=severity, lower_bound_delta=lower_bound_delta, )