To fetch the data the dbt models require, we'll write two Dagster assets: one for customers and one for orders.
In /tutorial_template/tutorial_dbt_dagster/assets/__init__.py:
Replace the import section at the top of the file with the following:
# /tutorial_template/tutorial_dbt_dagster/assets/__init__.pyimport pandas as pd
from dagster_dbt import load_assets_from_dbt_project
from dagster import asset, file_relative_path
In this step, we've added the pandas library and Dagster's asset.
@asset(key_prefix=["jaffle_shop"], group_name="staging")defcustomers_raw()-> pd.DataFrame:
data = pd.read_csv("https://docs.dagster.io/assets/customers.csv")return data
@asset(key_prefix=["jaffle_shop"], group_name="staging")deforders_raw()-> pd.DataFrame:
data = pd.read_csv("https://docs.dagster.io/assets/orders.csv")return data
Let's take a closer look at the arguments we've provided:
key_prefix - When the assets are materialized, Dagster will store them in DuckDB in the schema defined by the last value in key_prefix. In this case, that's jaffle_shop. The tables will have the same names as the assets that produced them, which are customers_raw and orders_raw.
Because these tables will become the source data for the stg_customers.sql and stg_orders.sql models in the dbt project, the names of the assets must match the table names specified in /tutorial_template/jaffle_shop/models/sources.yml, which you configured in part one of this tutorial.
group_name - When Dagster loads the dbt models as assets, the assets will be placed in an asset group based on the name of the folder (staging) containing the models. Because we want the assets we add to be included in the same group, we defined this as staging.
At this point, the /tutorial_template/tutorial_dbt_dagster/assets/__init__.py file should look like this:
# /tutorial_template/tutorial_dbt_dagster/assets/__init__.pyimport pandas as pd
from dagster_dbt import load_assets_from_dbt_project
from dagster import asset, file_relative_path
@asset(key_prefix=["jaffle_shop"], group_name="staging")defcustomers_raw()-> pd.DataFrame:
data = pd.read_csv("https://docs.dagster.io/assets/customers.csv")return data
@asset(key_prefix=["jaffle_shop"], group_name="staging")deforders_raw()-> pd.DataFrame:
data = pd.read_csv("https://docs.dagster.io/assets/orders.csv")return data
DBT_PROJECT_PATH = file_relative_path(__file__,"../../jaffle_shop")
DBT_PROFILES = file_relative_path(__file__,"../../jaffle_shop/config")
dbt_assets = load_assets_from_dbt_project(
project_dir=DBT_PROJECT_PATH, profiles_dir=DBT_PROFILES, key_prefix=["jaffle_shop"])
To materialize the assets, we need to tell Dagster how to handle the assets' inputs and outputs. We can do this using an I/O manager.
In this step, we'll supply the duckdb_io_manager to our assets. This resource is an I/O manager that, when assets are materialized, allows:
Upstream assets (customers_raw, orders_raw) to load data into DuckDB. In this example, the duckdb_io_manager uses DuckDBPandasTypeHandler to store the pandas DataFrames used in our assets as CSVs and load them into DuckDB.
Downstream assets to read data from DuckDB. We'll add the downstream asset in the next section.
In /tutorial_template/tutorial_dbt_dagster/__init__.py, replace its contents with the following:
# /tutorial_template/tutorial_dbt_dagster/__init__.pyimport os
from dagster_dbt import dbt_cli_resource
from tutorial_dbt_dagster import assets
from tutorial_dbt_dagster.assets import DBT_PROFILES, DBT_PROJECT_PATH
from dagster_duckdb_pandas import duckdb_pandas_io_manager
from dagster import Definitions, load_assets_from_modules
resources ={"dbt": dbt_cli_resource.configured({"project_dir": DBT_PROJECT_PATH,"profiles_dir": DBT_PROFILES,},),"io_manager": duckdb_pandas_io_manager.configured({"database": os.path.join(DBT_PROJECT_PATH,"tutorial.duckdb")}),}
defs = Definitions(assets=load_assets_from_modules([assets]), resources=resources)
Now that you've created assets and resources, it's time to materialize the assets! Materializing an asset runs the op it contains and saves the results to persistent storage. In this tutorial, we're saving asset outputs to DuckDB.
In Dagit, click the Reload definitions button. This ensures that Dagit picks up the changes you made in the previous steps.
At this point, the customers_raw and orders_raw assets should display above stg_customers and stg_orders as upstream dependencies:
Click the Materialize all button near the top right corner of the page, which will launch a run to materialize the assets. When finished, the Materialized and Latest Run attributes in the asset will be populated:
After the run completes, you can:
Click the asset to open a sidebar containing info about the asset, including its last materialization stats and a link to view the Asset details page
Click the ID of the Latest Run - in the above image, that's 651489a2 - in an asset to view the Run details page. This page contains detailed info about the run, including timing information, errors, and logs.
At this point, you've built and materialized two upstream Dagster assets, providing source data to your dbt models. In the last section of the tutorial, we'll show you how to add a downstream asset to the pipeline.