Source code for dagster_pandas.data_frame

import pandas as pd
from dagster import (
    AssetMaterialization,
    DagsterInvariantViolationError,
    DagsterType,
    Field,
    MetadataEntry,
    StringSource,
    TypeCheck,
    _check as check,
    dagster_type_loader,
    dagster_type_materializer,
)
from dagster._annotations import experimental
from dagster._check import CheckError
from dagster._config import Selector
from dagster._core.definitions.metadata import normalize_metadata
from dagster._core.errors import DagsterInvalidMetadata
from dagster._utils import dict_without_keys

from dagster_pandas.constraints import (
    ColumnDTypeFnConstraint,
    ColumnDTypeInSetConstraint,
    ConstraintViolationException,
)
from dagster_pandas.validation import PandasColumn, validate_constraints

CONSTRAINT_BLACKLIST = {ColumnDTypeFnConstraint, ColumnDTypeInSetConstraint}


@dagster_type_materializer(
    Selector(
        {
            "csv": {
                "path": StringSource,
                "sep": Field(StringSource, is_required=False, default_value=","),
            },
            "parquet": {"path": StringSource},
            "table": {"path": StringSource},
            "pickle": {"path": StringSource},
        },
    )
)
def dataframe_materializer(_context, config, pandas_df):
    check.inst_param(pandas_df, "pandas_df", pd.DataFrame)
    file_type, file_options = list(config.items())[0]

    if file_type == "csv":
        path = file_options["path"]
        pandas_df.to_csv(path, index=False, **dict_without_keys(file_options, "path"))
    elif file_type == "parquet":
        pandas_df.to_parquet(file_options["path"])
    elif file_type == "table":
        pandas_df.to_csv(file_options["path"], sep="\t", index=False)
    elif file_type == "pickle":
        pandas_df.to_pickle(file_options["path"])
    else:
        check.failed("Unsupported file_type {file_type}".format(file_type=file_type))

    return AssetMaterialization.file(file_options["path"])


@dagster_type_loader(
    Selector(
        {
            "csv": {
                "path": StringSource,
                "sep": Field(StringSource, is_required=False, default_value=","),
            },
            "parquet": {"path": StringSource},
            "table": {"path": StringSource},
            "pickle": {"path": StringSource},
        },
    )
)
def dataframe_loader(_context, config):
    file_type, file_options = list(config.items())[0]

    if file_type == "csv":
        path = file_options["path"]
        return pd.read_csv(path, **dict_without_keys(file_options, "path"))
    elif file_type == "parquet":
        return pd.read_parquet(file_options["path"])
    elif file_type == "table":
        return pd.read_csv(file_options["path"], sep="\t")
    elif file_type == "pickle":
        return pd.read_pickle(file_options["path"])
    else:
        raise DagsterInvariantViolationError(
            "Unsupported file_type {file_type}".format(file_type=file_type)
        )


def df_type_check(_, value):
    if not isinstance(value, pd.DataFrame):
        return TypeCheck(success=False)
    return TypeCheck(
        success=True,
        metadata_entries=[
            MetadataEntry("row_count", value=str(len(value))),
            # string cast columns since they may be things like datetime
            MetadataEntry("metadata", value={"columns": list(map(str, value.columns))}),
        ],
    )


DataFrame = DagsterType(
    name="PandasDataFrame",
    description="""Two-dimensional size-mutable, potentially heterogeneous
    tabular data structure with labeled axes (rows and columns).
    See http://pandas.pydata.org/""",
    loader=dataframe_loader,
    materializer=dataframe_materializer,
    type_check_fn=df_type_check,
    typing_type=pd.DataFrame,
)


def _construct_constraint_list(constraints):
    def add_bullet(constraint_list, constraint_description):
        return constraint_list + "+ {constraint_description}\n".format(
            constraint_description=constraint_description
        )

    constraint_list = ""
    for constraint in constraints:
        if constraint.__class__ not in CONSTRAINT_BLACKLIST:
            constraint_list = add_bullet(constraint_list, constraint.markdown_description)
    return constraint_list


def _build_column_header(column_name, constraints):
    header = "**{column_name}**".format(column_name=column_name)
    for constraint in constraints:
        if isinstance(constraint, ColumnDTypeInSetConstraint):
            dtypes_tuple = tuple(constraint.expected_dtype_set)
            return header + ": `{expected_dtypes}`".format(
                expected_dtypes=dtypes_tuple if len(dtypes_tuple) > 1 else dtypes_tuple[0]
            )
        elif isinstance(constraint, ColumnDTypeFnConstraint):
            return header + ": Validator `{expected_dtype_fn}`".format(
                expected_dtype_fn=constraint.type_fn.__name__
            )
    return header


def create_dagster_pandas_dataframe_description(description, columns):
    title = "\n".join([description, "### Columns", ""])
    buildme = title
    for column in columns:
        buildme += "{}\n{}\n".format(
            _build_column_header(column.name, column.constraints),
            _construct_constraint_list(column.constraints),
        )
    return buildme


[docs]def create_dagster_pandas_dataframe_type( name, description=None, columns=None, event_metadata_fn=None, dataframe_constraints=None, loader=None, materializer=None, ): """ Constructs a custom pandas dataframe dagster type. Args: name (str): Name of the dagster pandas type. description (Optional[str]): A markdown-formatted string, displayed in tooling. columns (Optional[List[PandasColumn]]): A list of :py:class:`~dagster.PandasColumn` objects which express dataframe column schemas and constraints. event_metadata_fn (Optional[Callable[[], Union[Dict[str, Union[str, float, int, Dict, MetadataValue]], List[MetadataEntry]]]]): A callable which takes your dataframe and returns a dict with string label keys and MetadataValue values. Can optionally return a List[MetadataEntry]. dataframe_constraints (Optional[List[DataFrameConstraint]]): A list of objects that inherit from :py:class:`~dagster.DataFrameConstraint`. This allows you to express dataframe-level constraints. loader (Optional[DagsterTypeLoader]): An instance of a class that inherits from :py:class:`~dagster.DagsterTypeLoader`. If None, we will default to using `dataframe_loader`. materializer (Optional[DagsterTypeMaterializer]): An instance of a class that inherits from :py:class:`~dagster.DagsterTypeMaterializer`. If None, we will default to using `dataframe_materializer`. """ # We allow for the plugging in of dagster_type_loaders/materializers so that # Users can load and materialize their custom dataframes via configuration their own way if the default # configs don't suffice. This is purely optional. check.str_param(name, "name") event_metadata_fn = check.opt_callable_param(event_metadata_fn, "event_metadata_fn") description = create_dagster_pandas_dataframe_description( check.opt_str_param(description, "description", default=""), check.opt_list_param(columns, "columns", of_type=PandasColumn), ) def _dagster_type_check(_, value): if not isinstance(value, pd.DataFrame): return TypeCheck( success=False, description="Must be a pandas.DataFrame. Got value of type. {type_name}".format( type_name=type(value).__name__ ), ) try: validate_constraints( value, pandas_columns=columns, dataframe_constraints=dataframe_constraints, ) except ConstraintViolationException as e: return TypeCheck(success=False, description=str(e)) return TypeCheck( success=True, metadata_entries=_execute_summary_stats(name, value, event_metadata_fn) if event_metadata_fn else None, ) return DagsterType( name=name, type_check_fn=_dagster_type_check, loader=loader if loader else dataframe_loader, materializer=materializer if materializer else dataframe_materializer, description=description, typing_type=pd.DataFrame, )
@experimental def create_structured_dataframe_type( name, description=None, columns_validator=None, columns_aggregate_validator=None, dataframe_validator=None, loader=None, materializer=None, ): """ Args: name (str): the name of the new type description (Optional[str]): the description of the new type columns_validator (Optional[Union[ColumnConstraintWithMetadata, MultiColumnConstraintWithMetadata]]): what column-level row by row validation you want to have applied. Leave empty for no column-level row by row validation. columns_aggregate_validator (Optional[Union[ColumnAggregateConstraintWithMetadata, MultiAggregateConstraintWithMetadata]]): what column-level aggregate validation you want to have applied, Leave empty for no column-level aggregate validation. dataframe_validator (Optional[Union[ConstraintWithMetadata, MultiConstraintWithMetadata]]): what dataframe-wide validation you want to have applied. Leave empty for no dataframe-wide validation. loader (Optional[DagsterTypeLoader]): An instance of a class that inherits from :py:class:`~dagster.DagsterTypeLoader`. If None, we will default to using `dataframe_loader`. materializer (Optional[DagsterTypeMaterializer]): An instance of a class that inherits from :py:class:`~dagster.DagsterTypeMaterializer`. If None, we will default to using `dataframe_materializer`. Returns: a DagsterType with the corresponding name and packaged validation. """ def _dagster_type_check(_, value): if not isinstance(value, pd.DataFrame): return TypeCheck( success=False, description="Must be a pandas.DataFrame. Got value of type. {type_name}".format( type_name=type(value).__name__ ), ) individual_result_dict = {} if dataframe_validator is not None: individual_result_dict["dataframe"] = dataframe_validator.validate(value) if columns_validator is not None: individual_result_dict["columns"] = columns_validator.validate(value) if columns_aggregate_validator is not None: individual_result_dict["column-aggregates"] = columns_aggregate_validator.validate( value ) typechecks_succeeded = True metadata = [] overall_description = "Failed Constraints: {}" constraint_clauses = [] for key, result in individual_result_dict.items(): result_val = result.success if result_val: continue typechecks_succeeded = typechecks_succeeded and result_val result_dict = result.metadata_entries[0].entry_data.data metadata.append( MetadataEntry( "{}-constraint-metadata".format(key), value=result_dict, ) ) constraint_clauses.append("{} failing constraints, {}".format(key, result.description)) # returns aggregates, then column, then dataframe return TypeCheck( success=typechecks_succeeded, description=overall_description.format(constraint_clauses), metadata_entries=sorted(metadata, key=lambda x: x.label), ) description = check.opt_str_param(description, "description", default="") return DagsterType( name=name, type_check_fn=_dagster_type_check, loader=loader if loader else dataframe_loader, materializer=materializer if loader else dataframe_materializer, description=description, ) def _execute_summary_stats(type_name, value, event_metadata_fn): if not event_metadata_fn: return [] metadata_or_metadata_entries = event_metadata_fn(value) invalid_message = ( "The return value of the user-defined summary_statistics function for pandas " f"data frame type {type_name} returned {value}. This function must return " "Union[Dict[str, Union[str, float, int, Dict, MetadataValue]], List[MetadataEntry]]" ) metadata = None metadata_entries = None if isinstance(metadata_or_metadata_entries, list): metadata_entries = metadata_or_metadata_entries elif isinstance(metadata_or_metadata_entries, dict): metadata = metadata_or_metadata_entries else: raise DagsterInvariantViolationError(invalid_message) try: return normalize_metadata(metadata, metadata_entries) except (DagsterInvalidMetadata, CheckError): raise DagsterInvariantViolationError(invalid_message)