Source code for dagster_dbt.asset_selection

from typing import AbstractSet, Any, Callable, Mapping, Optional, Sequence

import dagster._check as check
from dagster import AssetKey, AssetSelection
from dagster._annotations import experimental
from dagster._core.definitions.asset_graph import AssetGraph

from dagster_dbt.asset_defs import (
    _get_node_asset_key,
    _is_non_asset_node,
    _select_unique_ids_from_manifest_json,
)


[docs]@experimental class DbtManifestAssetSelection(AssetSelection): """Defines a selection of assets from a parsed dbt manifest.json file and a dbt-syntax selection string. Args: manifest_json (Mapping[str, Any]): The parsed manifest.json file from your dbt project. select (str): A dbt-syntax selection string, e.g. tag:foo or config.materialized:table. exclude (str): A dbt-syntax exclude string. Defaults to "". resource_types (Sequence[str]): The resource types to select. Defaults to ["model"]. node_info_to_asset_key (Callable[[Mapping[str, Any]], AssetKey]): A function that takes a dictionary of dbt metadata and returns the AssetKey that you want to represent a given model or source. If you pass in a custom function to `load_assets_from_dbt_manifest`, you must also pass in the same function here. Example: .. code-block:: python my_dbt_assets = load_assets_from_dbt_manifest( manifest_json, node_info_to_asset_key=my_node_info_to_asset_key_fn, ) # This will select all assets that have the tag "foo" and are in the path "marts/finance" my_selection = DbtManifestAssetSelection( manifest_json, select="tag:foo,path:marts/finance", node_info_to_asset_key=my_node_info_to_asset_key_fn, ) """ def __init__( self, manifest_json: Mapping[str, Any], select: str, exclude: str = "", resource_types: Optional[Sequence[str]] = None, node_info_to_asset_key: Callable[[Mapping[str, Any]], AssetKey] = _get_node_asset_key, ): self.manifest_json = check.dict_param(manifest_json, "manifest_json") self.select = check.str_param(select, "select") self.exclude = check.str_param(exclude, "exclude") self.resource_types = check.opt_list_param( resource_types, "resource_types", of_type=str ) or ["model"] self.node_info_to_asset_key = check.callable_param( node_info_to_asset_key, "node_info_to_asset_key" ) def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: dbt_nodes = { **self.manifest_json["nodes"], **self.manifest_json["sources"], **self.manifest_json["metrics"], **self.manifest_json["exposures"], } keys = set() for unique_id in _select_unique_ids_from_manifest_json( manifest_json=self.manifest_json, select=self.select, exclude=self.exclude ): node_info = dbt_nodes[unique_id] if node_info["resource_type"] in self.resource_types and not _is_non_asset_node( node_info ): keys.add(self.node_info_to_asset_key(node_info)) return keys