Asset sensors allow you to instigate runs when materializations occur.
Name | Description |
---|---|
RunRequest | The sensor evaluation function can yield one or more run requests. Each run request creates a job run. |
SkipReason | If a sensor evaluation doesn't yield any run requests, it can instead yield a skip reason to log why the evaluation was skipped or why there were no events to be processed. |
@asset_sensor | The decorator used to define an asset sensor. The decorated function is an evaluation function that takes in a SensorEvaluationContext and an asset materialization event. The decorator returns an AssetSensorDefinition |
AssetSensorDefinition | A special sensor definition class for asset sensors. You almost never want to use initialize this class directly. Instead, you should use the @asset_sensor which returns a AssetSensorDefinition |
@multi_asset_sensor | The decorator used to define an asset sensor that can monitor multiple assets at a time. The decorated function is an evaluation function that takes in a MultiAssetSensorEvaluationContext which has methods to fetch materialization events for the monitored assets. The decorator returns an MultiAssetSensorDefinition |
MultiAssetSensorDefinition | A special sensor definition class for multi asset sensors. You almost never want to use initialize this class directly. Instead, you should use the @multi_asset_sensor which returns a MultiAssetSensorDefinition |
MultiSensorEvaluationContext | The context object passed to a multi asset sensor evaluation function. Has methods for fetching materialization events for assets |
build_multi_asset_sensor_context | A function that constructs an instance of MultiAssetSensorEvaluationContext , This is intended to be used to test a multi asset sensor. |
A useful pattern is to create a sensor that checks for new AssetMaterialization
events for a particular asset key. This can be used to kick off a job that computes downstream assets or notifies appropriate stakeholders.
One benefit of this pattern is that it enables cross-job and even cross-repository dependencies. Each job run instigated by an asset sensor is agnostic to the job that caused it.
Dagster provides a special asset sensor definition format for sensors that fire a single RunRequest
based on a single asset materialization. Here is an example of a sensor that generates a RunRequest
for every materialization for the asset key my_table
:
from dagster import AssetKey, EventLogEntry, SensorEvaluationContext, asset_sensor
@asset_sensor(asset_key=AssetKey("my_table"), job=my_job)
def my_asset_sensor(context: SensorEvaluationContext, asset_event: EventLogEntry):
yield RunRequest(
run_key=context.cursor,
run_config={
"ops": {
"read_materialization": {
"config": {
"asset_key": asset_event.dagster_event.asset_key.path,
}
}
}
},
)
Multi-asset sensors, which can trigger job executions based on multiple asset materialization event streams, can be handled using the @multi_asset_sensor
decorator.
In the body of the sensor, you have access to the materialization event records for each asset via the MultiAssetSensorEvaluationContext
context object. Methods within the context object will search for events after the cursor.
MultiAssetSensorEvaluationContext.latest_materialization_records_by_key
returns a dictionary mapping the asset key for each monitored asset to the most recent materialization record. If there is no materialization event, the mapped value will be none.MultiAssetSensorEvaluationContext.materialization_records_for_key
returns a list of materialization event records for a specified asset key.@multi_asset_sensor(
asset_keys=[AssetKey("asset_a"), AssetKey("asset_b")],
job=my_job,
)
def asset_a_and_b_sensor(context):
asset_events = context.latest_materialization_records_by_key()
if all(asset_events.values()):
context.advance_all_cursors()
return RunRequest()
Note the context.advance_all_cursors
call near the end of the sensor. The cursor helps keep track of which materialization events have been processed by the sensor so that the next time the sensor runs, only newer events are fetched. Since multi_asset_sensor
s provide flexibility to determine what conditions should result in RunRequest
s, the sensor must manually update the cursor if a RunRequest
is returned.
You can also return a SkipReason
to document why the sensor didn't launch a run.
@multi_asset_sensor(
asset_keys=[AssetKey("asset_a"), AssetKey("asset_b")],
job=my_job,
)
def asset_a_and_b_sensor_with_skip_reason(context):
asset_events = context.latest_materialization_records_by_key()
if all(asset_events.values()):
context.advance_all_cursors()
return RunRequest()
elif any(asset_events.values()):
materialized_asset_key_strs = [
key.to_user_string() for key, value in asset_events.items() if value
]
not_materialized_asset_key_strs = [
key.to_user_string() for key, value in asset_events.items() if not value
]
return SkipReason(
f"Observed materializations for {materialized_asset_key_strs}, "
f"but not for {not_materialized_asset_key_strs}"
)
You can use the MultiAssetSensorEvaluationContext
context object to fetch partitioned materializations for each monitored asset.
MultiAssetSensorEvaluationContext.latest_materialization_records_by_partition
returns an mapping of partition key to the most recent materialization event record for that partition. Returns records in order by event ID.MultiAssetSensorEvaluationContext.latest_materialization_records_by_partition_and_asset
returns a mapping of each partition to a mapping of the most recent materialization record for that partition by asset key.The example below monitors two assets with the same daily partitions definition. Whenever both monitored assets have an unconsumed materialization for a given partition, the sensor kicks off a run for that partition.
@multi_asset_sensor(
asset_keys=[AssetKey("upstream_daily_1"), AssetKey("upstream_daily_2")],
job=downstream_daily_job,
)
def trigger_daily_asset_if_both_upstream_partitions_materialized(context):
run_requests = []
for (
partition,
materializations_by_asset,
) in context.latest_materialization_records_by_partition_and_asset().items():
if set(materializations_by_asset.keys()) == set(context.asset_keys):
run_requests.append(
downstream_daily_job.run_request_for_partition(partition)
)
for asset_key, materialization in materializations_by_asset.items():
context.advance_cursor({asset_key: materialization})
return run_requests
Notice the context.advance_cursor
call marks specific materializations as consumed. In future context calls, these materializations will not be returned.
At the end of each tick, the cursor object evaluates the set of consumed materializations. For each monitored asset, the cursor automatically stores:
latest_consumed_event_id
, the ID of the newest materialization that was marked as consumed.trailing_unconsumed_partitioned_event_ids
, the newest unconsumed event ID for up to 25 partitions of each asset. All of these events must be partitioned and older than the latest_consumed_event_id
event for the asset.The events in trailing_unconsumed_partitioned_event_ids
will appear in future context calls until they are marked as consumed via a call to advance_cursor
. A call to advance_all_cursors
will also mark all existing events as consumed.
The following example monitors two upstream daily-partitioned assets, kicking off materializations of the corresponding partition in the downstream daily-partitioned asset. After a partition has been materialized for both upstream assets, the downstream asset can then be materialized and the sensor kicks off a partitioned run. Replacing either upstream partition thereafter will yield a run request for the downstream asset.
@multi_asset_sensor(
asset_keys=[AssetKey("upstream_daily_1"), AssetKey("upstream_daily_2")],
job=downstream_daily_job,
)
def trigger_daily_asset_when_any_upstream_partitions_replaced(context):
run_requests = []
for (
partition,
materializations_by_asset,
) in context.latest_materialization_records_by_partition_and_asset().items():
if all(
[
context.all_partitions_materialized(asset_key, [partition])
for asset_key in context.asset_keys
]
):
run_requests.append(
downstream_daily_job.run_request_for_partition(partition)
)
for asset_key, materialization in materializations_by_asset.items():
if asset_key in context.asset_keys:
context.advance_cursor({asset_key: materialization})
return run_requests
If the PartitionsDefinition
of the monitored assets differs from the triggered asset, you can use the MultiAssetSensorEvaluationContext.get_downstream_partition_keys
method to map a partition key from one asset to another. This method accepts a partition key from the upstream asset and uses the existing PartitionMapping
object on the downstream asset to fetch the corresponding partitions in the downstream asset.
If a partition mapping is not defined, Dagster will use the default partition mapping, which is the TimeWindowPartitionMapping
for time window partitions definitions and the IdentityPartitionMapping
for other partitions definitions. The TimeWindowPartitionMapping
will map an upstream partition to the downstream partitions that overlap with it.
Looking for more? The examples section features another example of updating a weekly asset when upstream daily assets are materialized.
The following code example monitors an upstream daily-partitioned asset, kicking off materializations of a downstream weekly-partitioned asset whenever a daily partition is materialized and all daily partitions in the week have existing materializations. Every time a daily partition is replaced, the weekly partition will materialize.
MultiAssetSensorEvaluationContext.all_partitions_materialized
is a utility method accepts a list of partitions and checks if every provided partition has been materialized. This method ignores the cursor, so it searches through all existing materializations.@multi_asset_sensor(asset_keys=[AssetKey("upstream_daily_1")], job=weekly_asset_job)
def trigger_weekly_asset_from_daily_asset(context):
run_requests_by_partition = {}
materializations_by_partition = context.latest_materialization_records_by_partition(
AssetKey("upstream_daily_1")
)
# Get all corresponding weekly partitions for any materialized daily partitions
for partition, materialization in materializations_by_partition.items():
weekly_partitions = context.get_downstream_partition_keys(
partition,
from_asset_key=AssetKey("upstream_daily_1"),
to_asset_key=AssetKey("downstream_weekly_asset"),
)
if weekly_partitions: # Check that a downstream weekly partition exists
# Upstream daily partition can only map to at most one downstream weekly partition
daily_partitions_in_week = context.get_downstream_partition_keys(
weekly_partitions[0],
from_asset_key=AssetKey("downstream_weekly_asset"),
to_asset_key=AssetKey("upstream_daily_1"),
)
if context.all_partitions_materialized(
AssetKey("upstream_daily_1"), daily_partitions_in_week
):
run_requests_by_partition[
weekly_partitions[0]
] = weekly_asset_job.run_request_for_partition(weekly_partitions[0])
# Advance the cursor so we only check event log records past the cursor
context.advance_cursor({AssetKey("upstream_daily_1"): materialization})
return list(run_requests_by_partition.values())