from typing import Callable, Optional
from dagster import (
AssetsDefinition,
AssetSpec,
multi_asset,
)
from dlt.extract.source import DltSource
from dlt.pipeline.pipeline import Pipeline
from .constants import META_KEY_PIPELINE, META_KEY_SOURCE, META_KEY_TRANSLATOR
from .translator import DagsterDltTranslator
[docs]def dlt_assets(
*,
dlt_source: DltSource,
dlt_pipeline: Pipeline,
name: Optional[str] = None,
group_name: Optional[str] = None,
dlt_dagster_translator: DagsterDltTranslator = DagsterDltTranslator(),
) -> Callable[..., AssetsDefinition]:
"""Asset Factory for using data load tool (dlt).
Args:
dlt_source (DltSource): The DltSource to be ingested.
dlt_pipeline (Pipeline): The dlt Pipeline defining the destination parameters.
name (Optional[str], optional): The name of the op.
group_name (Optional[str], optional): The name of the asset group.
dlt_dagster_translator (DltDagsterTranslator, optional): Customization object for defining asset parameters from dlt resources.
Examples:
Loading Hubspot data to Snowflake with an auto materialize policy using the dlt verified source:
.. code-block:: python
class HubspotDltDagsterTranslator(DltDagsterTranslator):
@public
def get_auto_materialize_policy(self, resource: DltResource) -> Optional[AutoMaterializePolicy]:
return AutoMaterializePolicy.eager().with_rules(
AutoMaterializeRule.materialize_on_cron("0 0 * * *")
)
@dlt_assets(
dlt_source=hubspot(include_history=True),
dlt_pipeline=pipeline(
pipeline_name="hubspot",
dataset_name="hubspot",
destination="snowflake",
),
name="hubspot",
group_name="hubspot",
dlt_dagster_translator=HubspotDltDagsterTranslator(),
)
def hubspot_assets(context: AssetExecutionContext, dlt: DltDagsterResource):
yield from dlt.run(context=context)
Loading Github issues to snowflake:
.. code-block:: python
@dlt_assets(
dlt_source=github_reactions(
"dagster-io", "dagster", items_per_page=100, max_items=250
),
dlt_pipeline=pipeline(
pipeline_name="github_issues",
dataset_name="github",
destination="snowflake",
),
name="github",
group_name="github",
)
def github_reactions_dagster_assets(context: AssetExecutionContext, dlt: DltDagsterResource):
yield from dlt.run(context=context)
"""
def inner(fn) -> AssetsDefinition:
specs = [
AssetSpec(
key=dlt_dagster_translator.get_asset_key(dlt_source_resource),
deps=dlt_dagster_translator.get_deps_asset_keys(dlt_source_resource),
auto_materialize_policy=dlt_dagster_translator.get_auto_materialize_policy(
dlt_source_resource
),
metadata={
META_KEY_SOURCE: dlt_source,
META_KEY_PIPELINE: dlt_pipeline,
META_KEY_TRANSLATOR: dlt_dagster_translator,
},
)
for dlt_source_resource in dlt_source.resources.values()
]
assets_definition = multi_asset(
specs=specs, name=name, group_name=group_name, compute_kind="dlt", can_subset=True
)(fn)
return assets_definition
return inner