import datetime
from typing import Optional, Sequence, Union
from dagster._annotations import experimental
from dagster._core.definitions.asset_check_spec import AssetCheckSeverity
from ..asset_checks import AssetChecksDefinition
from ..assets import AssetsDefinition, SourceAsset
from ..events import CoercibleToAssetKey
from .utils import (
DEFAULT_FRESHNESS_SEVERITY,
DEFAULT_FRESHNESS_TIMEZONE,
build_freshness_checks_for_assets,
)
[docs]@experimental
def build_last_update_freshness_checks(
*,
assets: Sequence[Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset]],
lower_bound_delta: datetime.timedelta,
deadline_cron: Optional[str] = None,
timezone: str = DEFAULT_FRESHNESS_TIMEZONE,
severity: AssetCheckSeverity = DEFAULT_FRESHNESS_SEVERITY,
) -> Sequence[AssetChecksDefinition]:
r"""For each provided asset, constructs a freshness check definition.
This check passes if the asset is found to be "fresh", and fails if the asset is found to be
"overdue". An asset is considered fresh if a record (i.e. a materialization or observation)
exists with a timestamp greater than the "lower bound" derived from the parameters of this
function.
`deadline_cron` is a cron schedule that defines the deadline for when we should expect the asset
to arrive by; if not provided, we consider the deadline to be the execution time of the check.
`lower_bound_delta` is a timedelta that defines the lower bound for when a record could have
arrived by. If the most recent recent record's timestamp is earlier than
`deadline-lower_bound_delta`, the asset is considered overdue.
Let's use two examples, one with a deadline_cron set and one without.
Let's say I have an asset which runs on a schedule every day at 8:00 AM UTC, and usually takes
around 45 minutes to complete. To account for operational delays, I would expect the asset to be
done materializing every day by 9:00 AM UTC. I would set the `deadline_cron` to "0 9 \* \* \*", and
the `lower_bound_delta` to "45 minutes". This would mean that starting at 9:00 AM, this check
will expect a materialization record to have been created no earlier than 8:15 AM. Note that if
the check runs at 8:59 AM, the deadline has not yet passed, and we'll instead be checking for
the most recently passed deadline, which is yesterday.
Let's say I have an observable source asset on a data source which I expect should never be more
than 3 hours out of date. In this case, there's no fixed schedule for when the data should be
updated, so I would not provide a `deadline_cron`. Instead, I would set the `lower_bound_delta`
parameter to "3 hours". This would mean that the check will expect the most recent observation
record to indicate data no older than 3 hours, relative to the current time, regardless of when it runs.
Examples:
.. code-block:: python
# Example 1: Assets that are expected to be updated every day within 45 minutes of
# 9:00 AM UTC
from dagster import build_last_update_freshness_checks, AssetKey
from .somewhere import my_daily_scheduled_assets_def
checks = build_last_update_freshness_checks(
[my_daily_scheduled_assets_def, AssetKey("my_other_daily_asset_key")],
lower_bound_delta=datetime.timedelta(minutes=45),
deadline_cron="0 9 * * *",
)
# Example 2: Assets that are expected to be updated within 3 hours of the current time
from dagster import build_last_update_freshness_checks, AssetKey
from .somewhere import my_observable_source_asset
checks = build_last_update_freshness_checks(
[my_observable_source_asset, AssetKey("my_other_observable_asset_key")],
lower_bound_delta=datetime.timedelta(hours=3),
)
Args:
assets (Sequence[Union[CoercibleToAssetKey, AssetsDefinition, SourceAsset]): The assets to
construct checks for. For each individual asset (of which there can be multiple per
`AssetsDefinition`, there will be a corresponding constructed `AssetChecksDefinition`.
lower_bound_delta (datetime.timedelta): The check will pass if the asset was updated within
lower_bound_delta of the current_time (no cron) or the most recent tick of the cron
(cron provided).
deadline_cron (Optional[str]): Defines the deadline for when we should start checking
that the asset arrived. If not provided, the deadline is the execution time of the check.
timezone (Optional[str]): The timezone to use when calculating freshness and deadline. If
not provided, defaults to "UTC".
Returns:
Sequence[AssetChecksDefinition]: A list of `AssetChecksDefinition` objects, each
corresponding to an asset in the `assets` parameter.
"""
return build_freshness_checks_for_assets(
assets=assets,
deadline_cron=deadline_cron,
timezone=timezone,
severity=severity,
lower_bound_delta=lower_bound_delta,
)