A freshness check is a type of asset check that allows you identify Dagster assets that are overdue for a data refresh.
As freshness checks don't depend on a specific root cause, it makes them helpful in accounting for unknowns. For example, freshness checks can identify stale assets caused by any of the following:
The pipeline hitting an error and failing
Runs were never scheduled
A backed up run queue
Runs are taking longer than expected to complete
By the end of this guide, you'll understand what freshness checks are and how to implement them in your data pipelines.
An asset is considered fresh if it's been materialized or observed within a defined time window. Otherwise, assets are considered overdue and in need of an update. To identify these assets, you can use freshness checks.
Let's say members of your team review a dashboard every morning as part of their daily stand up. To ensure they have the most up-to-date information, assets powering the dashboard should be materialized no later than 7AM. Using a freshness check, we can define the time that the asset should be updated by. If the check runs at that time and the asset hasn't been updated, the asset will be considered overdue and the check will fail.
Additionally, freshness checks can help communicate SLAs for their data freshness. For example, downstream asset consumers can look at the checks that are defined on those assets to determine when and how often they’re expected to be updated.
Freshness checks use an asset's latest update time to determine if it's overdue. How Dagster determines the last update time varies on the type of asset being targeted:
For source assets, which are assets whose data feeds a Dagster pipeline, freshness checks utilize the time the asset was last updated to determine freshness. In this guide, we'll demonstrate how to use observable source assets and a schedule to achieve this.
For materializable assets, which are assets materialized by Dagster, Dagster infers the asset's last update time using the its latest materialization timestamp. In this guide, we'll demonstrate how to use @asset-decorated assets and a sensor to achieve this.
In this section, we'll demonstrate how to implement freshness checks for source assets. Source assets are assets whose data feeds a Dagster pipeline, but aren't materialized by Dagster.
To run freshness checks on source assets, the checks need to know when the source assets were last updated. Observable source assets - in this case, @multi_observable_source_asset can be used to track the update times of these assets.
The following is an example of multiple Snowflake tables, backed by an observation function that queries Snowflake to find the most recent time the tables were updated. The function yields the update time as metadata to be stored in the Dagster event log:
from dagster_snowflake import SnowflakeResource, fetch_last_updated_timestamps
from dagster import(
AssetSpec,
MetadataValue,
ObserveResult,
multi_observable_source_asset,)
TABLE_SCHEMA ="PUBLIC"
table_names =["charges","customers"]
asset_specs =[AssetSpec(table_name)for table_name in table_names]@multi_observable_source_asset(specs=asset_specs)defsource_tables(snowflake: SnowflakeResource):with snowflake.get_connection()as conn:
freshness_results = fetch_last_updated_timestamps(
snowflake_connection=conn.cursor(),
tables=table_names,
schema=TABLE_SCHEMA,)for table_name, last_updated in freshness_results.items():yield ObserveResult(
asset_key=table_name,
metadata={"dagster/last_updated_timestamp": MetadataValue.timestamp(
last_updated
)},)
Next, we'll define a schedule that regularly executes the function in the source_tables observable source asset:
from dagster import AssetSelection, ScheduleDefinition, define_asset_job
source_tables_observation_schedule = ScheduleDefinition(
job=define_asset_job("source_tables_observation_job",
selection=AssetSelection.assets(source_tables),),# Runs every minute. Usually, a much less frequent cadence is necessary,# but a short cadence makes it easier to play around with this example.
cron_schedule="* * * * *",)
When the code location is loaded and the schedule is turned on, it will automatically kick off runs to observe the asset.
In our example, we expect the source tables to be updated no less than every two hours. We'll use the build_freshness_checks_for_non_partitioned_assets function to produce a set of asset checks that fail if an asset’s last_updated_timestamp is more than two hours before the current time:
from datetime import timedelta
from dagster import build_last_update_freshness_checks
source_table_freshness_checks = build_last_update_freshness_checks(
assets=[source_tables],
lower_bound_delta=timedelta(hours=2),)
These checks will automatically execute after the observations of the source assets they target, so an additional schedule isn't needed.
Option 2: Use anomaly detection (Dagster Cloud Pro)#
A Dagster Cloud Pro plan is required to use this feature.
Setting custom freshness policies on a large number of source assets can be time-consuming. Dagster Cloud Pro users can take advantage of a time series anomaly detection model instead of applying policies on an asset-by-asset basis. Freshness checks that use this approach function the same way checks with hardcoded parameters do.
This model uses data from past materializations/observations to determine if data is arriving later than expected. Note: If the asset hasn't been updated enough times, the check will pass with a message indicating that more data is needed to detect anomalies.
In the following example, we'll use build_anomaly_detection_freshness_checks to accomplish this:
from dagster_cloud import build_anomaly_detection_freshness_checks
freshness_checks = build_anomaly_detection_freshness_checks(
assets=[source_tables], params=None)
The last step is to include the asset, checks, schedule, and resource in a Definitions object. This enables Dagster tools to load everything we've defined:
At this point, the finished code should look like this:
from datetime import timedelta
from dagster_snowflake import SnowflakeResource, fetch_last_updated_timestamps
from dagster import(
AssetSelection,
AssetSpec,
Definitions,
EnvVar,
MetadataValue,
ObserveResult,
ScheduleDefinition,
build_last_update_freshness_checks,
define_asset_job,
multi_observable_source_asset,)
TABLE_SCHEMA ="PUBLIC"
table_names =["charges","customers"]
asset_specs =[AssetSpec(table_name)for table_name in table_names]@multi_observable_source_asset(specs=asset_specs)defsource_tables(snowflake: SnowflakeResource):with snowflake.get_connection()as conn:
freshness_results = fetch_last_updated_timestamps(
snowflake_connection=conn.cursor(),
tables=table_names,
schema=TABLE_SCHEMA,)for table_name, last_updated in freshness_results.items():yield ObserveResult(
asset_key=table_name,
metadata={"dagster/last_updated_timestamp": MetadataValue.timestamp(
last_updated
)},)
source_tables_observation_schedule = ScheduleDefinition(
job=define_asset_job("source_tables_observation_job",
selection=AssetSelection.assets(source_tables),),# Runs every minute. Usually, a much less frequent cadence is necessary,# but a short cadence makes it easier to play around with this example.
cron_schedule="* * * * *",)
source_table_freshness_checks = build_last_update_freshness_checks(
assets=[source_tables],
lower_bound_delta=timedelta(hours=2),)
defs = Definitions(
assets=[source_tables],
asset_checks=[*source_table_freshness_checks],
schedules=[source_tables_observation_schedule],
resources={"snowflake": SnowflakeResource(
user=EnvVar("SNOWFLAKE_USER"),
account=EnvVar("SNOWFLAKE_ACCOUNT"),
password=EnvVar("SNOWFLAKE_PASSWORD"),)},)
Defining freshness checks for materializable assets#
In this section, we'll demonstrate how to implement freshness checks for materializable assets. These are assets that are materialized by a Dagster pipeline.
In this example, we'll use the build_last_update_freshness_checks function to produce an asset check. This check will fail if an asset’s latest materialization is more than two hours before the current time:
from datetime import timedelta
from dagster import asset, build_last_update_freshness_checks
@assetdefmy_asset():...
asset1_freshness_checks = build_last_update_freshness_checks(
assets=[my_asset], lower_bound_delta=timedelta(hours=2))
A schedule or sensor is required to ensure the freshness check executes. If the check only runs after the asset has been materialized, the check won't be able to detect the times materialization fails.
In this example, we'll use build_sensor_for_freshness_checks to create a sensor to automatically run the check. Based on the check's parameters and last run time, the sensor will run the check when enough time has elapsed that the asset might fail the check:
from dagster import build_sensor_for_freshness_checks
freshness_checks_sensor = build_sensor_for_freshness_checks(
freshness_checks=[*asset1_freshness_checks])