Delta Lake (dagster-deltalake)
This library provides an integration with the Delta Lake storage framework.
Related Guides:
- 
dagster_deltalake.DeltaLakeIOManager IOManagerDefinition[source]
- 
Config Schema:
- root_uri (dagster.StringSource):
- Storage location where Delta tables are stored. 
- mode (Union[WriteMode, None], optional):
- The write mode passed to save the output. - Default Value: ‘overwrite’ 
- overwrite_schema (Union[dagster.BoolSource, None], optional):
- Default Value: False 
- writer_engine (Union[WriterEngine, None], optional):
- Engine passed to write_deltalake. - Default Value: ‘pyarrow’ 
- storage_options (selector):
- 
- Config Schema:- 
- azure (strict dict):
- Storage configuration for Microsoft Azure Blob or ADLS Gen 2 object store. - 
- Config Schema:- 
- account_name (dagster.StringSource):
- client_id (Union[dagster.StringSource, None], optional):
- client_secret (Union[dagster.StringSource, None], optional):
- tenant_id (Union[dagster.StringSource, None], optional):
- federated_token_file (Union[dagster.StringSource, None], optional):
- account_key (Union[dagster.StringSource, None], optional):
- sas_key (Union[dagster.StringSource, None], optional):
- token (Union[dagster.StringSource, None], optional):
- use_azure_cli (Union[dagster.BoolSource, None], optional):
- use_fabric_endpoint (Union[dagster.BoolSource, None], optional):
- msi_resource_id (Union[dagster.StringSource, None], optional):
- msi_endpoint (Union[dagster.StringSource, None], optional):
- container_name (Union[dagster.StringSource, None], optional):
 
 
- s3 (strict dict, optional):
- Storage configuration for Amazon Web Services (AWS) S3 object store. - 
- Default Value:- {
    "imdsv1_fallback": false
}
 - 
- Config Schema:- 
- access_key_id (Union[dagster.StringSource, None], optional):
- secret_access_key (Union[dagster.StringSource, None], optional):
- region (Union[dagster.StringSource, None], optional):
- bucket (Union[dagster.StringSource, None], optional):
- endpoint (Union[dagster.StringSource, None], optional):
- token (Union[dagster.StringSource, None], optional):
- imdsv1_fallback (Union[dagster.BoolSource, None], optional):
- Default Value: False 
- virtual_hosted_style_request (Union[dagster.StringSource, None], optional):
- unsigned_payload (Union[dagster.BoolSource, None], optional):
- checksum (Union[dagster.StringSource, None], optional):
- metadata_endpoint (Union[dagster.StringSource, None], optional):
- container_credentials_relative_uri (Union[dagster.StringSource, None], optional):
- copy_if_not_exists (Union[dagster.StringSource, None], optional):
- allow_unsafe_rename (Union[dagster.BoolSource, None], optional):
 
 
- local (strict dict, optional):
- Storage configuration for local object store. - 
- Default Value:
 
- gcs (strict dict, optional):
- Storage configuration for Google Cloud Storage object store. - 
- Default Value:
 - 
- Config Schema:- 
- service_account (Union[dagster.StringSource, None], optional):
- service_account_key (Union[dagster.StringSource, None], optional):
- bucket (Union[dagster.StringSource, None], optional):
- application_credentials (Union[dagster.StringSource, None], optional):
 
 
 
 
- client_options (Union[strict dict, None], optional):
- Additional configuration passed to http client. 
- table_config (Union[dict, None], optional):
- Additional config and metadata added to table on creation. 
- schema (Union[dagster.StringSource, None], optional):
- Name of the schema to use. 
- custom_metadata (Union[dict, None], optional):
- Custom metadata that is added to transaction commit. 
- writer_properties (Union[dict, None], optional):
- Writer properties passed to the rust engine writer. 
 
 Base class for an IO manager definition that reads inputs from and writes outputs to Delta Lake. Examples from dagster_deltalake import DeltaLakeIOManager
from dagster_deltalake_pandas import DeltaLakePandasTypeHandler
class MyDeltaLakeIOManager(DeltaLakeIOManager):
    @staticmethod
    def type_handlers() -> Sequence[DbTypeHandler]:
        return [DeltaLakePandasTypeHandler()]
@asset(
    key_prefix=["my_schema"]  # will be used as the schema (parent folder) in Delta Lake
)
def my_table() -> pd.DataFrame:  # the name of the asset will be the table name
    ...
defs = Definitions(
    assets=[my_table],
    resources={"io_manager": MyDeltaLakeIOManager()}
)
If you do not provide a schema, Dagster will determine a schema based on the assets and ops using
the I/O Manager. For assets, the schema will be determined from the asset key, as in the above example.
For ops, the schema can be specified by including a “schema” entry in output metadata. If none
of these is provided, the schema will default to “public”. @op(
    out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
    ...
To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the
In or AssetIn. @asset(
    ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame):
    # my_table will just contain the data from column "a"
    ...
- 
dagster_deltalake.DeltaLakePyarrowIOManager IOManagerDefinition[source]
- 
Config Schema:
- root_uri (dagster.StringSource):
- Storage location where Delta tables are stored. 
- mode (Union[WriteMode, None], optional):
- The write mode passed to save the output. - Default Value: ‘overwrite’ 
- overwrite_schema (Union[dagster.BoolSource, None], optional):
- Default Value: False 
- writer_engine (Union[WriterEngine, None], optional):
- Engine passed to write_deltalake. - Default Value: ‘pyarrow’ 
- storage_options (selector):
- 
- Config Schema:- 
- azure (strict dict):
- Storage configuration for Microsoft Azure Blob or ADLS Gen 2 object store. - 
- Config Schema:- 
- account_name (dagster.StringSource):
- client_id (Union[dagster.StringSource, None], optional):
- client_secret (Union[dagster.StringSource, None], optional):
- tenant_id (Union[dagster.StringSource, None], optional):
- federated_token_file (Union[dagster.StringSource, None], optional):
- account_key (Union[dagster.StringSource, None], optional):
- sas_key (Union[dagster.StringSource, None], optional):
- token (Union[dagster.StringSource, None], optional):
- use_azure_cli (Union[dagster.BoolSource, None], optional):
- use_fabric_endpoint (Union[dagster.BoolSource, None], optional):
- msi_resource_id (Union[dagster.StringSource, None], optional):
- msi_endpoint (Union[dagster.StringSource, None], optional):
- container_name (Union[dagster.StringSource, None], optional):
 
 
- s3 (strict dict, optional):
- Storage configuration for Amazon Web Services (AWS) S3 object store. - 
- Default Value:- {
    "imdsv1_fallback": false
}
 - 
- Config Schema:- 
- access_key_id (Union[dagster.StringSource, None], optional):
- secret_access_key (Union[dagster.StringSource, None], optional):
- region (Union[dagster.StringSource, None], optional):
- bucket (Union[dagster.StringSource, None], optional):
- endpoint (Union[dagster.StringSource, None], optional):
- token (Union[dagster.StringSource, None], optional):
- imdsv1_fallback (Union[dagster.BoolSource, None], optional):
- Default Value: False 
- virtual_hosted_style_request (Union[dagster.StringSource, None], optional):
- unsigned_payload (Union[dagster.BoolSource, None], optional):
- checksum (Union[dagster.StringSource, None], optional):
- metadata_endpoint (Union[dagster.StringSource, None], optional):
- container_credentials_relative_uri (Union[dagster.StringSource, None], optional):
- copy_if_not_exists (Union[dagster.StringSource, None], optional):
- allow_unsafe_rename (Union[dagster.BoolSource, None], optional):
 
 
- local (strict dict, optional):
- Storage configuration for local object store. - 
- Default Value:
 
- gcs (strict dict, optional):
- Storage configuration for Google Cloud Storage object store. - 
- Default Value:
 - 
- Config Schema:- 
- service_account (Union[dagster.StringSource, None], optional):
- service_account_key (Union[dagster.StringSource, None], optional):
- bucket (Union[dagster.StringSource, None], optional):
- application_credentials (Union[dagster.StringSource, None], optional):
 
 
 
 
- client_options (Union[strict dict, None], optional):
- Additional configuration passed to http client. 
- table_config (Union[dict, None], optional):
- Additional config and metadata added to table on creation. 
- schema (Union[dagster.StringSource, None], optional):
- Name of the schema to use. 
- custom_metadata (Union[dict, None], optional):
- Custom metadata that is added to transaction commit. 
- writer_properties (Union[dict, None], optional):
- Writer properties passed to the rust engine writer. 
 
 Base class for an IO manager definition that reads inputs from and writes outputs to Delta Lake. Examples from dagster_deltalake import DeltaLakeIOManager
from dagster_deltalake_pandas import DeltaLakePandasTypeHandler
class MyDeltaLakeIOManager(DeltaLakeIOManager):
    @staticmethod
    def type_handlers() -> Sequence[DbTypeHandler]:
        return [DeltaLakePandasTypeHandler()]
@asset(
    key_prefix=["my_schema"]  # will be used as the schema (parent folder) in Delta Lake
)
def my_table() -> pd.DataFrame:  # the name of the asset will be the table name
    ...
defs = Definitions(
    assets=[my_table],
    resources={"io_manager": MyDeltaLakeIOManager()}
)
If you do not provide a schema, Dagster will determine a schema based on the assets and ops using
the I/O Manager. For assets, the schema will be determined from the asset key, as in the above example.
For ops, the schema can be specified by including a “schema” entry in output metadata. If none
of these is provided, the schema will default to “public”. @op(
    out={"my_table": Out(metadata={"schema": "my_schema"})}
)
def make_my_table() -> pd.DataFrame:
    ...
To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the
In or AssetIn. @asset(
    ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
)
def my_table_a(my_table: pd.DataFrame):
    # my_table will just contain the data from column "a"
    ...
- 
dagster_deltalake.DeltaTableResource ResourceDefinition[source]
- 
Config Schema:
- url (dagster.StringSource):
- storage_options (selector):
- 
- Config Schema:- 
- azure (strict dict):
- Storage configuration for Microsoft Azure Blob or ADLS Gen 2 object store. - 
- Config Schema:- 
- account_name (dagster.StringSource):
- client_id (Union[dagster.StringSource, None], optional):
- client_secret (Union[dagster.StringSource, None], optional):
- tenant_id (Union[dagster.StringSource, None], optional):
- federated_token_file (Union[dagster.StringSource, None], optional):
- account_key (Union[dagster.StringSource, None], optional):
- sas_key (Union[dagster.StringSource, None], optional):
- token (Union[dagster.StringSource, None], optional):
- use_azure_cli (Union[dagster.BoolSource, None], optional):
- use_fabric_endpoint (Union[dagster.BoolSource, None], optional):
- msi_resource_id (Union[dagster.StringSource, None], optional):
- msi_endpoint (Union[dagster.StringSource, None], optional):
- container_name (Union[dagster.StringSource, None], optional):
 
 
- s3 (strict dict, optional):
- Storage configuration for Amazon Web Services (AWS) S3 object store. - 
- Default Value:- {
    "imdsv1_fallback": false
}
 - 
- Config Schema:- 
- access_key_id (Union[dagster.StringSource, None], optional):
- secret_access_key (Union[dagster.StringSource, None], optional):
- region (Union[dagster.StringSource, None], optional):
- bucket (Union[dagster.StringSource, None], optional):
- endpoint (Union[dagster.StringSource, None], optional):
- token (Union[dagster.StringSource, None], optional):
- imdsv1_fallback (Union[dagster.BoolSource, None], optional):
- Default Value: False 
- virtual_hosted_style_request (Union[dagster.StringSource, None], optional):
- unsigned_payload (Union[dagster.BoolSource, None], optional):
- checksum (Union[dagster.StringSource, None], optional):
- metadata_endpoint (Union[dagster.StringSource, None], optional):
- container_credentials_relative_uri (Union[dagster.StringSource, None], optional):
- copy_if_not_exists (Union[dagster.StringSource, None], optional):
- allow_unsafe_rename (Union[dagster.BoolSource, None], optional):
 
 
- local (strict dict, optional):
- Storage configuration for local object store. - 
- Default Value:
 
- gcs (strict dict, optional):
- Storage configuration for Google Cloud Storage object store. - 
- Default Value:
 - 
- Config Schema:- 
- service_account (Union[dagster.StringSource, None], optional):
- service_account_key (Union[dagster.StringSource, None], optional):
- bucket (Union[dagster.StringSource, None], optional):
- application_credentials (Union[dagster.StringSource, None], optional):
 
 
 
 
- client_options (Union[strict dict, None], optional):
- Additional configuration passed to http client. 
- version (Union[dagster.IntSource, None], optional):
- Version to load delta table. 
 
 Resource for interacting with a Delta table. Examples from dagster import Definitions, asset
from dagster_deltalake import DeltaTableResource, LocalConfig
@asset
def my_table(delta_table: DeltaTableResource):
    df = delta_table.load().to_pandas()
defs = Definitions(
    assets=[my_table],
    resources={
        "delta_table": DeltaTableResource(
            url="/path/to/table",
            storage_options=LocalConfig()
        )
    }
)