Represents all the information required to launch a single run. Must be returned by a SensorDefinition or ScheduleDefinition’s evaluation function for a run to be launched.
To build a run request for a particular partitition, use
run_request_for_partition()
.
A string key to identify this launched run. For sensors, ensures that only one run is created per run key across all sensor evaluations. For schedules, ensures that one run is created per tick, across failure recoveries. Passing in a None value means that a run will always be launched per evaluation.
Optional[str]
The config that parameterizes the run execution to be launched, as a dict.
Optional[Dict]
A dictionary of tags (string key-value pairs) to attach to the launched run.
Optional[Dict[str, str]]
(Experimental) The name of the job this run request will launch. Required for sensors that target multiple jobs.
Optional[str]
Creates a schedule following the provided cron schedule and requests runs for the provided job.
The decorated function takes in a ScheduleEvaluationContext
as its only
argument, and does one of the following:
Return a RunRequest object.
Return a list of RunRequest objects.
Return a SkipReason object, providing a descriptive message of why no runs were requested.
Return nothing (skipping without providing a reason)
Return a run config dictionary.
Yield a SkipReason or yield one ore more RunRequest objects.
Returns a ScheduleDefinition
.
cron_schedule (Union[str, Sequence[str]]) – A valid cron string or sequence of cron strings
specifying when the schedule will run, e.g., '45 23 * * 6'
for a schedule that runs
at 11:45 PM every Saturday. If a sequence is provided, then the schedule will run for
the union of all execution times for the provided cron strings, e.g.,
['45 23 * * 6', '30 9 * * 0]
for a schedule that runs at 11:45 PM every Saturday and
9:30 AM every Sunday.
name (Optional[str]) – The name of the schedule to create.
tags (Optional[Dict[str, str]]) – A dictionary of tags (string key-value pairs) to attach to the scheduled runs.
tags_fn (Optional[Callable[[ScheduleEvaluationContext], Optional[Dict[str, str]]]]) – A function
that generates tags to attach to the schedules runs. Takes a
ScheduleEvaluationContext
and returns a dictionary of tags (string
key-value pairs). You may set only one of tags
and tags_fn
.
should_execute (Optional[Callable[[ScheduleEvaluationContext], bool]]) – A function that runs at
schedule execution time to determine whether a schedule should execute or skip. Takes a
ScheduleEvaluationContext
and returns a boolean (True
if the
schedule should execute). Defaults to a function that always returns True
.
environment_vars (Optional[Dict[str, str]]) – Any environment variables to set when executing the schedule.
execution_timezone (Optional[str]) – Timezone in which the schedule should run. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.
description (Optional[str]) – A human-readable description of the schedule.
job (Optional[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]) – The job that should execute when this schedule runs.
default_status (DefaultScheduleStatus) – Whether the schedule starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
Define a schedule that targets a job.
name (Optional[str]) – The name of the schedule to create. Defaults to the job name plus “_schedule”.
cron_schedule (Union[str, Sequence[str]]) – A valid cron string or sequence of cron strings
specifying when the schedule will run, e.g., '45 23 * * 6'
for a schedule that runs
at 11:45 PM every Saturday. If a sequence is provided, then the schedule will run for
the union of all execution times for the provided cron strings, e.g.,
['45 23 * * 6', '30 9 * * 0]
for a schedule that runs at 11:45 PM every Saturday and
9:30 AM every Sunday.
execution_fn (Callable[ScheduleEvaluationContext]) –
The core evaluation function for the
schedule, which is run at an interval to determine whether a run should be launched or
not. Takes a ScheduleEvaluationContext
.
This function must return a generator, which must yield either a single SkipReason or one or more RunRequest objects.
run_config (Optional[Mapping]) – The config that parameterizes this execution, as a dict.
run_config_fn (Optional[Callable[[ScheduleEvaluationContext], [Mapping]]]) – A function that
takes a ScheduleEvaluationContext object and returns the run configuration that
parameterizes this execution, as a dict. You may set only one of run_config
,
run_config_fn
, and execution_fn
.
tags (Optional[Mapping[str, str]]) – A dictionary of tags (string key-value pairs) to attach to the scheduled runs.
tags_fn (Optional[Callable[[ScheduleEvaluationContext], Optional[Mapping[str, str]]]]) – A
function that generates tags to attach to the schedules runs. Takes a
ScheduleEvaluationContext
and returns a dictionary of tags (string
key-value pairs). You may set only one of tags
, tags_fn
, and execution_fn
.
should_execute (Optional[Callable[[ScheduleEvaluationContext], bool]]) – A function that runs
at schedule execution time to determine whether a schedule should execute or skip. Takes
a ScheduleEvaluationContext
and returns a boolean (True
if the
schedule should execute). Defaults to a function that always returns True
.
environment_vars (Optional[dict[str, str]]) – The environment variables to set for the schedule
execution_timezone (Optional[str]) – Timezone in which the schedule should run. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.
description (Optional[str]) – A human-readable description of the schedule.
job (Optional[Union[GraphDefinition, JobDefinition]]) – The job that should execute when this schedule runs.
default_status (DefaultScheduleStatus) – Whether the schedule starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
The context object available as the first argument various functions defined on a dagster.ScheduleDefinition
.
A ScheduleEvaluationContext object is passed as the first argument to run_config_fn
, tags_fn
,
and should_execute
.
Users should not instantiate this object directly. To construct a ScheduleEvaluationContext for testing purposes, use dagster.build_schedule_context()
.
The serialized instance configured to run the schedule
Optional[InstanceRef]
The time in which the execution was scheduled to happen. May differ slightly from both the actual execution time and the time at which the run config is computed. Not available in all schedulers - currently only set in deployments using DagsterDaemonScheduler.
datetime
Example:
from dagster import schedule, ScheduleEvaluationContext
@schedule
def the_schedule(context: ScheduleEvaluationContext):
...
Builds schedule execution context using the provided parameters.
The instance provided to build_schedule_context
must be persistent;
DagsterInstance.ephemeral() will result in an error.
instance (Optional[DagsterInstance]) – The dagster instance configured to run the schedule.
scheduled_execution_time (datetime) – The time in which the execution was scheduled to happen. May differ slightly from both the actual execution time and the time at which the run config is computed.
Examples
context = build_schedule_context(instance)
daily_schedule.evaluate_tick(context)
For partitioned schedules, controls the maximum number of past partitions for each schedule that will be considered when looking for missing runs . Generally this parameter will only come into play if the scheduler falls behind or launches after experiencing downtime. This parameter will not be checked for schedules without partition sets (for example, schedules created using the @schedule decorator) - only the most recent execution time will be considered for those schedules.
Note that no matter what this value is, the scheduler will never launch a run from a time before the schedule was turned on (even if the start_date on the schedule is earlier) - if you want to launch runs for earlier partitions, launch a backfill.
Default Value: 5
For each schedule tick that raises an error, how many times to retry that tick
Default Value: 0
Default scheduler implementation that submits runs from the dagster-daemon long-lived process. Periodically checks each running schedule for execution times that don’t have runs yet and launches them.
Creates a schedule from a time window-partitioned job or a job that targets time window-partitioned assets.
The schedule executes at the cadence specified by the partitioning of the job or assets.
Examples
######################################
# Job that targets partitioned assets
######################################
from dagster import (
DailyPartitionsDefinition,
asset,
build_schedule_from_partitioned_job,
define_asset_job,
)
@asset(partitions_def=DailyPartitionsDefinition(start_date="2020-01-01"))
def asset1():
...
asset1_job = define_asset_job("asset1_job", selection=[asset1])
# The created schedule will fire daily
asset1_job_schedule = build_schedule_from_partitioned_job(asset1_job)
defs = Definitions(assets=[asset1], schedules=[asset1_job_schedule])
################
# Non-asset job
################
from dagster import DailyPartitionsDefinition, build_schedule_from_partitioned_job, jog
@job(partitions_def=DailyPartitionsDefinition(start_date="2020-01-01"))
def do_stuff_partitioned():
...
# The created schedule will fire daily
do_stuff_partitioned_schedule = build_schedule_from_partitioned_job(
do_stuff_partitioned,
)
defs = Definitions(schedules=[do_stuff_partitioned_schedule])
Defines run config over a set of hourly partitions.
The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate.
The decorated function should return a run config dictionary.
The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at the start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If minute_offset is provided, the start and end times of each partition will be minute_offset past the hour.
start_date (Union[datetime.datetime, str]) – The first date in the set of partitions. Can provide in either a datetime or string format.
minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults to 0.
fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.
timezone (Optional[str]) – The timezone in which each date should exist. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.
end_offset (int) – Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on.
tags_for_partition_fn (Optional[Callable[[str], Mapping[str, str]]]) – A function that accepts a partition time window and returns a dictionary of tags to attach to runs for that partition.
@hourly_partitioned_config(start_date=datetime(2022, 03, 12))
# creates partitions (2022-03-12-00:00, 2022-03-12-01:00), (2022-03-12-01:00, 2022-03-12-02:00), ...
@hourly_partitioned_config(start_date=datetime(2022, 03, 12), minute_offset=15)
# creates partitions (2022-03-12-00:15, 2022-03-12-01:15), (2022-03-12-01:15, 2022-03-12-02:15), ...
Defines run config over a set of daily partitions.
The decorated function should accept a start datetime and end datetime, which represent the bounds of the date partition the config should delineate.
The decorated function should return a run config dictionary.
The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at the start_date at midnight. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day.
start_date (Union[datetime.datetime, str]) – The first date in the set of partitions. Can provide in either a datetime or string format.
minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults to 0.
hour_offset (int) – Number of hours past 00:00 to “split” the partition. Defaults to 0.
timezone (Optional[str]) – The timezone in which each date should exist. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.
fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.
end_offset (int) – Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on.
tags_for_partition_fn (Optional[Callable[[str], Mapping[str, str]]]) – A function that accepts a partition time window and returns a dictionary of tags to attach to runs for that partition.
@daily_partitioned_config(start_date="2022-03-12")
# creates partitions (2022-03-12-00:00, 2022-03-13-00:00), (2022-03-13-00:00, 2022-03-14-00:00), ...
@daily_partitioned_config(start_date="2022-03-12", minute_offset=15, hour_offset=16)
# creates partitions (2022-03-12-16:15, 2022-03-13-16:15), (2022-03-13-16:15, 2022-03-14-16:15), ...
Defines run config over a set of weekly partitions.
The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate.
The decorated function should return a run config dictionary.
The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at the start_date. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If day_offset is provided, the start and end date of each partition will be day of the week corresponding to day_offset (0 indexed with Sunday as the start of the week). If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day.
start_date (Union[datetime.datetime, str]) – The first date in the set of partitions will Sunday at midnight following start_date. Can provide in either a datetime or string format.
minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults to 0.
hour_offset (int) – Number of hours past 00:00 to “split” the partition. Defaults to 0.
day_offset (int) – Day of the week to “split” the partition. Defaults to 0 (Sunday).
timezone (Optional[str]) – The timezone in which each date should exist. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.
fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.
end_offset (int) – Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on.
tags_for_partition_fn (Optional[Callable[[str], Mapping[str, str]]]) – A function that accepts a partition time window and returns a dictionary of tags to attach to runs for that partition.
@weekly_partitioned_config(start_date="2022-03-12")
# creates partitions (2022-03-13-00:00, 2022-03-20-00:00), (2022-03-20-00:00, 2022-03-27-00:00), ...
@weekly_partitioned_config(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=6)
# creates partitions (2022-03-12-03:15, 2022-03-19-03:15), (2022-03-19-03:15, 2022-03-26-03:15), ...
Defines run config over a set of monthly partitions.
The decorated function should accept a start datetime and end datetime, which represent the date partition the config should delineate.
The decorated function should return a run config dictionary.
The resulting object created by this decorator can be provided to the config argument of a Job. The first partition in the set will start at midnight on the soonest first of the month after start_date. The last partition in the set will end before the current time, unless the end_offset argument is set to a positive number. If day_offset is provided, the start and end date of each partition will be day_offset. If minute_offset and/or hour_offset are used, the start and end times of each partition will be hour_offset:minute_offset of each day.
start_date (Union[datetime.datetime, str]) – The first date in the set of partitions will be midnight the sonnest first of the month following start_date. Can provide in either a datetime or string format.
minute_offset (int) – Number of minutes past the hour to “split” the partition. Defaults to 0.
hour_offset (int) – Number of hours past 00:00 to “split” the partition. Defaults to 0.
day_offset (int) – Day of the month to “split” the partition. Defaults to 1.
timezone (Optional[str]) – The timezone in which each date should exist. Supported strings for timezones are the ones provided by the IANA time zone database <https://www.iana.org/time-zones> - e.g. “America/Los_Angeles”.
fmt (Optional[str]) – The date format to use. Defaults to %Y-%m-%d.
end_offset (int) – Extends the partition set by a number of partitions equal to the value passed. If end_offset is 0 (the default), the last partition ends before the current time. If end_offset is 1, the second-to-last partition ends before the current time, and so on.
tags_for_partition_fn (Optional[Callable[[str], Mapping[str, str]]]) – A function that accepts a partition time window and returns a dictionary of tags to attach to runs for that partition.
@monthly_partitioned_config(start_date="2022-03-12")
# creates partitions (2022-04-01-00:00, 2022-05-01-00:00), (2022-05-01-00:00, 2022-06-01-00:00), ...
@monthly_partitioned_config(start_date="2022-03-12", minute_offset=15, hour_offset=3, day_offset=5)
# creates partitions (2022-04-05-03:15, 2022-05-05-03:15), (2022-05-05-03:15, 2022-06-05-03:15), ...
Creates a sensor where the decorated function is used as the sensor’s evaluation function. The decorated function may:
Return a RunRequest object.
Return a list of RunRequest objects.
Return a SkipReason object, providing a descriptive message of why no runs were requested.
Return nothing (skipping without providing a reason)
Yield a SkipReason or yield one or more RunRequest objects.
Takes a SensorEvaluationContext
.
name (Optional[str]) – The name of the sensor. Defaults to the name of the decorated function.
minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse between sensor evaluations.
description (Optional[str]) – A human-readable description of the sensor.
job (Optional[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]) – The job to be executed when the sensor fires.
jobs (Optional[Sequence[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]]) – (experimental) A list of jobs to be executed when the sensor fires.
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
asset_selection (AssetSelection) – (Experimental) an asset selection to launch a run for if the sensor condition is met. This can be provided instead of specifying a job.
Define a sensor that initiates a set of runs based on some external state.
evaluation_fn (Callable[[SensorEvaluationContext]]) –
The core evaluation function for the
sensor, which is run at an interval to determine whether a run should be launched or
not. Takes a SensorEvaluationContext
.
This function must return a generator, which must yield either a single SkipReason or one or more RunRequest objects.
name (Optional[str]) – The name of the sensor to create. Defaults to name of evaluation_fn
minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse between sensor evaluations.
description (Optional[str]) – A human-readable description of the sensor.
job (Optional[GraphDefinition, JobDefinition, UnresolvedAssetJob]) – The job to execute when this sensor fires.
jobs (Optional[Sequence[GraphDefinition, JobDefinition, UnresolvedAssetJob]]) – (experimental) A list of jobs to execute when this sensor fires.
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
asset_selection (AssetSelection) – (Experimental) an asset selection to launch a run for if the sensor condition is met. This can be provided instead of specifying a job.
The context object available as the argument to the evaluation function of a dagster.SensorDefinition
.
Users should not instantiate this object directly. To construct a
SensorEvaluationContext for testing purposes, use dagster.
build_sensor_context()
.
The serialized instance configured to run the schedule
Optional[InstanceRef]
The cursor, passed back from the last sensor evaluation via the cursor attribute of SkipReason and RunRequest
Optional[str]
DEPRECATED The last time that the sensor was evaluated (UTC).
float
DEPRECATED The run key of the RunRequest most recently created by this sensor. Use the preferred cursor attribute instead.
str
The name of the repository that the sensor belongs to.
Optional[str]
The deserialized instance can also be passed in directly (primarily useful in testing contexts).
Optional[DagsterInstance]
Example:
from dagster import sensor, SensorEvaluationContext
@sensor
def the_sensor(context: SensorEvaluationContext):
...
The cursor value for this sensor, which was set in an earlier sensor evaluation.
Updates the cursor value for this sensor, which will be provided on the context for the next sensor evaluation.
This can be used to keep track of progress and avoid duplicate work across sensor evaluations.
cursor (Optional[str]) –
Builds sensor execution context using the provided parameters.
This function can be used to provide a context to the invocation of a sensor definition.If provided, the dagster instance must be persistent; DagsterInstance.ephemeral() will result in an error.
instance (Optional[DagsterInstance]) – The dagster instance configured to run the sensor.
cursor (Optional[str]) – A cursor value to provide to the evaluation of the sensor.
repository_name (Optional[str]) – The name of the repository that the sensor belongs to.
repository_def (Optional[RepositoryDefinition]) – The repository that the sensor belongs to.
Examples
context = build_sensor_context()
my_sensor(context)
Creates an asset sensor where the decorated function is used as the asset sensor’s evaluation function. The decorated function may:
Return a RunRequest object.
Return a list of RunRequest objects.
Return a SkipReason object, providing a descriptive message of why no runs were requested.
Return nothing (skipping without providing a reason)
Yield a SkipReason or yield one or more RunRequest objects.
Takes a SensorEvaluationContext
and an EventLogEntry corresponding to an
AssetMaterialization event.
asset_key (AssetKey) – The asset_key this sensor monitors.
name (Optional[str]) – The name of the sensor. Defaults to the name of the decorated function.
minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse between sensor evaluations.
description (Optional[str]) – A human-readable description of the sensor.
job (Optional[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]) – The job to be executed when the sensor fires.
jobs (Optional[Sequence[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]]) – (experimental) A list of jobs to be executed when the sensor fires.
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
Example
from dagster import AssetKey, EventLogEntry, SensorEvaluationContext, asset_sensor
@asset_sensor(asset_key=AssetKey("my_table"), job=my_job)
def my_asset_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry):
return RunRequest(
run_key=context.cursor,
run_config={
"ops": {
"read_materialization": {
"config": {
"asset_key": asset_event.dagster_event.asset_key.path,
}
}
}
},
)
Define an asset sensor that initiates a set of runs based on the materialization of a given asset.
name (str) – The name of the sensor to create.
asset_key (AssetKey) – The asset_key this sensor monitors.
asset_materialization_fn (Callable[[SensorEvaluationContext, EventLogEntry], Union[Iterator[Union[RunRequest, SkipReason]], RunRequest, SkipReason]]) –
The core
evaluation function for the sensor, which is run at an interval to determine whether a
run should be launched or not. Takes a SensorEvaluationContext
and
an EventLogEntry corresponding to an AssetMaterialization event.
This function must return a generator, which must yield either a single SkipReason or one or more RunRequest objects.
minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse between sensor evaluations.
description (Optional[str]) – A human-readable description of the sensor.
job (Optional[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]) – The job object to target with this sensor.
jobs (Optional[Sequence[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]]) – (experimental) A list of jobs to be executed when the sensor fires.
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
Define a sensor that reacts to the status of a given set of asset freshness policies, where the decorated function will be evaluated on every tick for each asset in the selection that has a FreshnessPolicy defined.
Note: returning or yielding a value from the annotated function will result in an error.
Takes a FreshnessPolicySensorContext
.
asset_selection (AssetSelection) – The asset selection monitored by the sensor.
name (Optional[str]) – The name of the sensor. Defaults to the name of the decorated function.
freshness_policy_sensor_fn (Callable[[FreshnessPolicySensorContext], None]) – The core
evaluation function for the sensor. Takes a FreshnessPolicySensorContext
.
minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse between sensor evaluations.
description (Optional[str]) – A human-readable description of the sensor.
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
Define a sensor that reacts to the status of a given set of asset freshness policies, where the decorated function will be evaluated on every sensor tick.
name (str) – The name of the sensor. Defaults to the name of the decorated function.
freshness_policy_sensor_fn (Callable[[FreshnessPolicySensorContext], None]) – The core
evaluation function for the sensor. Takes a FreshnessPolicySensorContext
.
asset_selection (AssetSelection) – The asset selection monitored by the sensor.
minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse between sensor evaluations.
description (Optional[str]) – A human-readable description of the sensor.
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
The context
object available to a decorated function of freshness_policy_sensor
.
the name of the sensor.
str
the key of the asset being monitored
the freshness policy of the asset being monitored
Optional[float]
the minutes_late value for this asset on the previous sensor tick.
Optional[float]
the current instance.
Builds freshness policy sensor context from provided parameters.
This function can be used to provide the context argument when directly invoking a function decorated with @freshness_policy_sensor, such as when writing unit tests.
sensor_name (str) – The name of the sensor the context is being constructed for.
asset_key (AssetKey) – The AssetKey for the monitored asset
freshness_policy (FreshnessPolicy) – The FreshnessPolicy for the monitored asset
minutes_late (Optional[float]) – How late the monitored asset currently is
previous_minutes_late (Optional[float]) – How late the monitored asset was on the previous tick.
instance (DagsterInstance) – The dagster instance configured for the context.
Examples
context = build_freshness_policy_sensor_context(
sensor_name="freshness_policy_sensor_to_invoke",
asset_key=AssetKey("some_asset"),
freshness_policy=FreshnessPolicy(maximum_lag_minutes=30)<
minutes_late=10.0,
)
freshness_policy_sensor_to_invoke(context)
Creates an asset sensor that can monitor multiple assets.
The decorated function is used as the asset sensor’s evaluation function. The decorated function may:
Return a RunRequest object.
Return a list of RunRequest objects.
Return a SkipReason object, providing a descriptive message of why no runs were requested.
Return nothing (skipping without providing a reason)
Yield a SkipReason or yield one or more RunRequest objects.
Takes a MultiAssetSensorEvaluationContext
.
monitored_assets (Union[Sequence[AssetKey], AssetSelection]) – The assets this sensor monitors. If an AssetSelection object is provided, it will only apply to assets within the Definitions that this sensor is part of.
name (Optional[str]) – The name of the sensor. Defaults to the name of the decorated function.
minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse between sensor evaluations.
description (Optional[str]) – A human-readable description of the sensor.
job (Optional[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]) – The job to be executed when the sensor fires.
jobs (Optional[Sequence[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]]) – (experimental) A list of jobs to be executed when the sensor fires.
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
request_assets (Optional[AssetSelection]) – (Experimental) an asset selection to launch a run for if the sensor condition is met. This can be provided instead of specifying a job.
Define an asset sensor that initiates a set of runs based on the materialization of a list of assets.
Users should not instantiate this object directly. To construct a
MultiAssetSensorDefinition, use dagster.
multi_asset_sensor()
.
name (str) – The name of the sensor to create.
asset_keys (Sequence[AssetKey]) – The asset_keys this sensor monitors.
asset_materialization_fn (Callable[[MultiAssetSensorEvaluationContext], Union[Iterator[Union[RunRequest, SkipReason]], RunRequest, SkipReason]]) –
The core
evaluation function for the sensor, which is run at an interval to determine whether a
run should be launched or not. Takes a MultiAssetSensorEvaluationContext
.
This function must return a generator, which must yield either a single SkipReason or one or more RunRequest objects.
minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse between sensor evaluations.
description (Optional[str]) – A human-readable description of the sensor.
job (Optional[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]) – The job object to target with this sensor.
jobs (Optional[Sequence[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]]) – (experimental) A list of jobs to be executed when the sensor fires.
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
request_assets (Optional[AssetSelection]) – (Experimental) an asset selection to launch a run for if the sensor condition is met. This can be provided instead of specifying a job.
The context object available as the argument to the evaluation function of a
dagster.MultiAssetSensorDefinition
.
Users should not instantiate this object directly. To construct a
MultiAssetSensorEvaluationContext for testing purposes, use dagster.
build_multi_asset_sensor_context()
.
The MultiAssetSensorEvaluationContext contains a cursor object that tracks the state of consumed event logs for each monitored asset. For each asset, the cursor stores the storage ID of the latest materialization that has been marked as “consumed” (via a call to advance_cursor) in a latest_consumed_event_id field.
For each monitored asset, the cursor will store the latest unconsumed event ID for up to 25 partitions. Each event ID must be before the latest_consumed_event_id field for the asset.
Events marked as consumed via advance_cursor will be returned in future ticks until they are marked as consumed.
To update the cursor to the latest materialization and clear the unconsumed events, call advance_all_cursors.
The assets monitored by the sensor. If an AssetSelection object is provided, it will only apply to assets within the Definitions that this sensor is part of.
Union[Sequence[AssetKey], AssetSelection]
The repository that the sensor belongs to.
The serialized instance configured to run the schedule
Optional[InstanceRef]
The cursor, passed back from the last sensor evaluation via the cursor attribute of SkipReason and RunRequest. Must be a dictionary of asset key strings to a stringified tuple of (latest_event_partition, latest_event_storage_id, trailing_unconsumed_partitioned_event_ids).
Optional[str]
DEPRECATED The last time that the sensor was consumed (UTC).
float
DEPRECATED The run key of the RunRequest most recently created by this sensor. Use the preferred cursor attribute instead.
str
The name of the repository that the sensor belongs to.
Optional[str]
The deserialized instance can also be passed in directly (primarily useful in testing contexts).
Optional[DagsterInstance]
Example:
from dagster import multi_asset_sensor, MultiAssetSensorEvaluationContext
@multi_asset_sensor(monitored_assets=[AssetKey("asset_1), AssetKey("asset_2)])
def the_sensor(context: MultiAssetSensorEvaluationContext):
...
Updates the cursor to the most recent materialization event for all assets monitored by the multi_asset_sensor.
Marks all materialization events as consumed by the sensor, including unconsumed events.
Marks the provided materialization records as having been consumed by the sensor.
At the end of the tick, the cursor will be updated to advance past all materializations records provided via advance_cursor. In the next tick, records that have been consumed will no longer be returned.
Passing a partitioned materialization record into this function will mark prior materializations with the same asset key and partition as having been consumed.
materialization_records_by_key (Mapping[AssetKey, Optional[EventLogRecord]]) – Mapping of AssetKeys to EventLogRecord or None. If an EventLogRecord is provided, the cursor for the AssetKey will be updated and future calls to fetch asset materialization events will not fetch this event again. If None is provided, the cursor for the AssetKey will not be updated.
A utility method to check if a provided list of partitions have been materialized for a particular asset. This method ignores the cursor and checks all materializations for the asset.
asset_key (AssetKey) – The asset to check partitions for.
partitions (Optional[Sequence[str]]) – A list of partitions to check. If not provided, all partitions for the asset will be checked.
True if all selected partitions have been materialized, False otherwise.
bool
A utility method to get the current partition the cursor is on.
Converts a partition key from one asset to the corresponding partition key in a downstream asset. Uses the existing partition mapping between the upstream asset and the downstream asset if it exists, otherwise, uses the default partition mapping.
partition_key (str) – The partition key to convert.
from_asset_key (AssetKey) – The asset key of the upstream asset, which the provided partition key belongs to.
to_asset_key (AssetKey) – The asset key of the downstream asset. The provided partition key will be mapped to partitions within this asset.
partition_key maps to.
Sequence[str]
Fetches the unconsumed events for a given asset key. Returns only events before the latest consumed event ID for the given asset. To mark an event as consumed, pass the event to advance_cursor. Returns events in ascending order by storage ID.
asset_key (AssetKey) – The asset key to get unconsumed events for.
The unconsumed events for the given asset key.
Sequence[EventLogRecord]
Fetches the most recent materialization event record for each asset in asset_keys. Only fetches events after the latest consumed event ID for the given asset key.
asset_keys (Optional[Sequence[AssetKey]]) – list of asset keys to fetch events for. If not specified, the latest materialization will be fetched for all assets the multi_asset_sensor monitors.
materialization event for the asset. If there is no materialization event for the asset, the value in the mapping will be None.
Given an asset, returns a mapping of partition key to the latest materialization event for that partition. Fetches only materializations that have not been marked as “consumed” via a call to advance_cursor.
asset_key (AssetKey) – The asset to fetch events for.
after_cursor_partition (Optional[bool]) – If True, only materializations with partitions after the cursor’s current partition will be returned. By default, set to False.
Mapping of AssetKey to a mapping of partitions to EventLogRecords where the EventLogRecord is the most recent materialization event for the partition. The mapping preserves the order that the materializations occurred.
Mapping[str, EventLogRecord]
Example
@asset(partitions_def=DailyPartitionsDefinition("2022-07-01"))
def july_asset():
return 1
@multi_asset_sensor(asset_keys=[july_asset.key])
def my_sensor(context):
context.latest_materialization_records_by_partition(july_asset.key)
# After materializing july_asset for 2022-07-05, latest_materialization_by_partition
# returns {"2022-07-05": EventLogRecord(...)}
Finds the most recent unconsumed materialization for each partition for each asset monitored by the sensor. Aggregates all materializations into a mapping of partition key to a mapping of asset key to the materialization event for that partition.
For example, if the sensor monitors two partitioned assets A and B that are materialized for partition_x after the cursor, this function returns:
{ "partition_x": {asset_a.key: EventLogRecord(...), asset_b.key: EventLogRecord(...)} }
This method can only be called when all monitored assets are partitioned and share the same partition definition.
Fetches asset materialization event records for asset_key, with the earliest event first.
Only fetches events after the latest consumed event ID for the given asset key.
asset_key (AssetKey) – The asset to fetch materialization events for
limit (int) – The number of events to fetch
Builds multi asset sensor execution context for testing purposes using the provided parameters.
This function can be used to provide a context to the invocation of a multi asset sensor definition. If provided, the dagster instance must be persistent; DagsterInstance.ephemeral() will result in an error.
repository_def (RepositoryDefinition) – The repository definition that the sensor belongs to.
monitored_assets (Union[Sequence[AssetKey], AssetSelection]) – The assets monitored by the sensor. If an AssetSelection object is provided, it will only apply to assets within the Definitions that this sensor is part of.
instance (Optional[DagsterInstance]) – The dagster instance configured to run the sensor.
cursor (Optional[str]) – A string cursor to provide to the evaluation of the sensor. Must be a dictionary of asset key strings to ints that has been converted to a json string
repository_name (Optional[str]) – The name of the repository that the sensor belongs to.
cursor_from_latest_materializations (bool) – If True, the cursor will be set to the latest materialization for each monitored asset. By default, set to False.
Examples
with instance_for_test() as instance:
context = build_multi_asset_sensor_context(
monitored_assets=[AssetKey("asset_1"), AssetKey("asset_2")],
instance=instance,
)
my_asset_sensor(context)
Constructs a sensor that will monitor the provided assets and launch materializations to “reconcile” them.
An asset is considered “unreconciled” if any of:
This sensor has never tried to materialize it and it has never been materialized.
Any of its parents have been materialized more recently than it has.
Any of its parents are unreconciled.
It is not currently up to date with respect to its freshness policy.
The sensor won’t try to reconcile any assets before their parents are reconciled. When multiple FreshnessPolicies require data from the same upstream assets, this sensor will attempt to launch a minimal number of runs of that asset to satisfy all constraints.
asset_selection (AssetSelection) – The group of assets you want to keep up-to-date
name (str) – The name to give the sensor.
minimum_interval_seconds (Optional[int]) – The minimum amount of time that should elapse between sensor invocations.
description (Optional[str]) – A description for the sensor.
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
run_tags (Optional[Mapping[str, str]) – Dictionary of tags to pass to the RunRequests launched by this sensor
SensorDefinition
Example
If you have the following asset graph, with no freshness policies:
a b c
\ / \ /
d e
\ /
f
and create the sensor:
build_asset_reconciliation_sensor(
AssetSelection.assets(d, e, f),
name="my_reconciliation_sensor",
)
If a
, b
, and c
are all materialized, then on the next sensor tick, the sensor will see that d
and e
can
be materialized. Since d
and e
will be materialized, f
can also be materialized. The sensor will kick off a
run that will materialize d
, e
, and f
.
If, on the next sensor tick, none of a
, b
, and c
have been materialized again, the sensor will not launch a run.
If, before the next sensor tick, just asset a
and b
have been materialized, the sensor will launch a run to
materialize d
, e
, and f
, because they’re downstream of a
and b
.
Even though c
hasn’t been materialized, the downstream assets can still be
updated, because c
is still considered “reconciled”.
Example
c: FreshnessPolicy(maximum_lag_minutes=120, cron_schedule="0 2 \* \* \*")
, meaning
that by 2AM, c needs to be materialized with data from a and b that is no more than 120
minutes old (i.e. all of yesterday’s data).
a b
\ /
c
and create the sensor:
build_asset_reconciliation_sensor(
AssetSelection.all(),
name="my_reconciliation_sensor",
)
Assume that c
currently has incorporated all source data up to 2022-01-01 23:00
.
At any time between 2022-01-02 00:00
and 2022-01-02 02:00
, the sensor will see that
c
will soon require data from 2022-01-02 00:00
. In order to satisfy this
requirement, there must be a materialization for both a
and b
with time >=
2022-01-02 00:00
. If such a materialization does not exist for one of those assets,
the missing asset(s) will be executed on this tick, to help satisfy the constraint imposed
by c
. Materializing c
in the same run as those assets will satisfy its
required data constraint, and so the sensor will kick off a run for c
alongside
whichever upstream assets did not have up-to-date data.
On the next tick, the sensor will see that a run is currently planned which will satisfy that constraint, so no runs will be kicked off.
Once that run completes, a new materialization event will exist for c
, which will
incorporate all of the required data, so no new runs will be kicked off until the
following day.
Define a sensor that reacts to a given status of pipeline execution, where the decorated function will be evaluated when a run is at the given status.
name (str) – The name of the sensor. Defaults to the name of the decorated function.
run_status (DagsterRunStatus) – The status of a run which will be monitored by the sensor.
run_status_sensor_fn (Callable[[RunStatusSensorContext], Union[SkipReason, PipelineRunReaction]]) – The core
evaluation function for the sensor. Takes a RunStatusSensorContext
.
minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse between sensor evaluations.
description (Optional[str]) – A human-readable description of the sensor.
monitored_jobs (Optional[List[Union[JobDefinition, GraphDefinition, UnresolvedAssetJobDefinition, JobSelector, RepositorySelector, CodeLocationSelector]]]) – The jobs in the current repository that will be monitored by this sensor. Defaults to None, which means the alert will be sent when any job in the repository fails.
monitor_all_repositories (bool) – If set to True, the sensor will monitor all runs in the Dagster instance. If set to True, an error will be raised if you also specify monitored_jobs or job_selection. Defaults to False.
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
request_job (Optional[Union[GraphDefinition, JobDefinition]]) – The job a RunRequest should execute if yielded from the sensor.
request_jobs (Optional[Sequence[Union[GraphDefinition, JobDefinition]]]) – (experimental) A list of jobs to be executed if RunRequests are yielded from the sensor.
The context
object available to a decorated function of run_status_sensor
.
the name of the sensor.
str
the run of the job or pipeline.
the event associated with the job or pipeline run status.
the current instance.
the logger for the given sensor evaluation
logging.Logger
The context
object available to a decorated function of run_failure_sensor
.
the name of the sensor.
str
the failed pipeline run.
the pipeline failure event.
Builds run status sensor context from provided parameters.
This function can be used to provide the context argument when directly invoking a function decorated with @run_status_sensor or @run_failure_sensor, such as when writing unit tests.
sensor_name (str) – The name of the sensor the context is being constructed for.
dagster_event (DagsterEvent) – A DagsterEvent with the same event type as the one that triggers the run_status_sensor
dagster_instance (DagsterInstance) – The dagster instance configured for the context.
dagster_run (DagsterRun) – DagsterRun object from running a job
Examples
instance = DagsterInstance.ephemeral()
result = my_job.execute_in_process(instance=instance)
dagster_run = result.dagster_run
dagster_event = result.get_job_success_event() # or get_job_failure_event()
context = build_run_status_sensor_context(
sensor_name="run_status_sensor_to_invoke",
dagster_instance=instance,
dagster_run=dagster_run,
dagster_event=dagster_event,
)
run_status_sensor_to_invoke(context)
Creates a sensor that reacts to a given status of pipeline execution, where the decorated function will be run when a pipeline is at the given status.
Takes a RunStatusSensorContext
.
run_status (DagsterRunStatus) – The status of run execution which will be monitored by the sensor.
name (Optional[str]) – The name of the sensor. Defaults to the name of the decorated function.
minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse between sensor evaluations.
description (Optional[str]) – A human-readable description of the sensor.
monitored_jobs (Optional[List[Union[PipelineDefinition, GraphDefinition, UnresolvedAssetJobDefinition, RepositorySelector, JobSelector, CodeLocationSelector]]]) – Jobs in the current repository that will be monitored by this sensor. Defaults to None, which means the alert will be sent when any job in the repository matches the requested run_status. Jobs in external repositories can be monitored by using RepositorySelector or JobSelector.
monitor_all_repositories (bool) – If set to True, the sensor will monitor all runs in the Dagster instance. If set to True, an error will be raised if you also specify monitored_jobs or job_selection. Defaults to False.
job_selection (Optional[List[Union[PipelineDefinition, GraphDefinition, RepositorySelector, JobSelector, CodeLocationSelector]]]) – (deprecated in favor of monitored_jobs) Jobs in the current repository that will be monitored by this sensor. Defaults to None, which means the alert will be sent when any job in the repository matches the requested run_status.
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
request_job (Optional[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]) – The job that should be executed if a RunRequest is yielded from the sensor.
request_jobs (Optional[Sequence[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]]) – (experimental) A list of jobs to be executed if RunRequests are yielded from the sensor.
Creates a sensor that reacts to job failure events, where the decorated function will be run when a run fails.
Takes a RunFailureSensorContext
.
name (Optional[str]) – The name of the job failure sensor. Defaults to the name of the decorated function.
minimum_interval_seconds (Optional[int]) – The minimum number of seconds that will elapse between sensor evaluations.
description (Optional[str]) – A human-readable description of the sensor.
monitored_jobs (Optional[List[Union[JobDefinition, GraphDefinition, UnresolvedAssetJobDefinition, RepositorySelector, JobSelector, CodeLocationSelector]]]) – The jobs in the current repository that will be monitored by this failure sensor. Defaults to None, which means the alert will be sent when any job in the current repository fails.
monitor_all_repositories (bool) – If set to True, the sensor will monitor all runs in the Dagster instance. If set to True, an error will be raised if you also specify monitored_jobs or job_selection. Defaults to False.
job_selection (Optional[List[Union[JobDefinition, GraphDefinition, RepositorySelector, JobSelector, CodeLocationSelector]]]) – (deprecated in favor of monitored_jobs) The jobs in the current repository that will be monitored by this failure sensor. Defaults to None, which means the alert will be sent when any job in the repository fails.
default_status (DefaultSensorStatus) – Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API.
request_job (Optional[Union[GraphDefinition, JobDefinition, UnresolvedAssetJob]]) – The job a RunRequest should execute if yielded from the sensor.
request_jobs (Optional[Sequence[Union[GraphDefinition, JobDefinition, UnresolvedAssetJob]]]) – (experimental) A list of jobs to be executed if RunRequests are yielded from the sensor.