Sensor
SensorOperator is an Operator that will block our DAG by keep checking a certain condition until that condition was met.
Getting Started
FileSensor
Note
FileSensor is a sensor that will keep checking if the target file exists or not.
This is an example to use the FileSensor to check /home/hello.txt
.
The task waiting_for_file
will keep running until the target file exists.
from airflow.models import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.empty import EmptyOperator
from pendulum import datetime, now
with DAG(
dag_id='medium_file_sensor',
schedule='@daily',
catchup=False,
start_date=datetime(2024,3,1),
max_active_runs=1
):
start = EmptyOperator(task_id='start')
waiting_for_file = FileSensor(
task_id='waiting_for_file',
filepath='/home/hello.txt'
)
end = EmptyOperator(task_id='end')
start >> waiting_for_file >> end
DateTimeSensor
Note
DateTimeSensor is a sensor that will keep checking if current time pass the target datetime or not.
This is an example to use the DateTimeSensor to check if current time pass
2024-03-10 4:35 PM (UTC+7)
. The task waiting_for_datetime will keep running
until pass the target time.
from airflow.models import DAG
from airflow.sensors.date_time import DateTimeSensor
from airflow.operators.empty import EmptyOperator
from pendulum import datetime, now
with DAG(
dag_id='medium_datetime_sensor',
schedule='@daily',
catchup=False,
start_date=datetime(2024,3,1),
max_active_runs=1
):
start = EmptyOperator(task_id='start')
waiting_for_datetime = DateTimeSensor(
task_id='waiting_for_datetime',
target_time=datetime(2024,3,10,16,36,tz= 'Asia/Bangkok')
)
end = EmptyOperator(task_id='end')
start >> waiting_for_datetime >> end
PythonSensor
Note
PythonSensor is a sensor that will execute Python to do something to return
Boolean value, if it’s True
then process to the next step.
Note
Additionally, PythonSensor also able to pass a value to Airflow’s XCom.
This is an example of how to PythonSensor to check if current time pass
2024-03-10 4:35 PM (UTC+7)
just like DateTimeSensor and it will also send
the string Hello word to Airflow's XCom for the next task.
from airflow.models import DAG
from airflow.decorators import task
from airflow.sensors.base import PokeReturnValue
from airflow.operators.empty import EmptyOperator
from pendulum import datetime, now
with DAG(
dag_id='medium_python_sensor',
schedule='@daily',
catchup=False,
start_date=datetime(2024,3,1),
max_active_runs=1
):
start= EmptyOperator(task_id='start')
@task.sensor(task_id='check_datetime_python')
def check_datetime_python_task() -> PokeReturnValue:
# Check current > target
condition_met = now() >= datetime(2024,3,10,16,36,tz= 'Asia/Bangkok')
if condition_met :
# Return Something
operator_return_value = 'hello world'
else:
# Return Value as None if condition doesn't met
operator_return_value = None
# Return Poke Value
return PokeReturnValue(
is_done=condition_met,
xcom_value=operator_return_value,
)
@task(task_id= 'print_value')
def print_value_task(content) :
print(content)
check_datetime_python = check_datetime_python_task()
print_value = print_value_task(check_datetime_python)
# End
end = EmptyOperator(task_id='end')
# Set Dependencies Flow
start >> check_datetime_python >> print_value >> end
ExternalTaskSensor
Note
ExternalTaskSensor is a sensor that will keep checking one of these:
- Check if a certain task in the upstream DAG is finish or not.
- Check if the upstream DAG is finish or not.
Note
*DAG Run Date of both upstream DAG and Sensor must be the same.
This is an example to use the ExternalTaskSensor if the upstream DAG named
medium_datetime_sensor
from the previous example finish or not.
One good thing about this sensor is that we can re-direct into the upstream DAG
using the External DAG button in the UI.
from airflow.models import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.empty import EmptyOperator
from pendulum import datetime, now
with DAG(
dag_id='medium_external_sensor',
schedule='@daily',
catchup=False,
start_date=datetime(2024, 3, 1),
max_active_runs=1
):
start = EmptyOperator(task_id='start')
waiting_for_upstream = ExternalTaskSensor(
task_id='waiting_for_upstream',
external_dag_id='medium_datetime_sensor',
# None for DAG finish, Task_id for specific task
external_task_id=None
)
end = EmptyOperator(task_id= 'end')
start >> waiting_for_upstream >> end
Warning
Something to be aware of is that the default ExternalTaskSensor will only check the upstream DAG’s status only when the current DAG and the upstream DAG have exactly the same DAG execution date.
But we can make some adjustments with the execution_date_fn
parameter.
This is an example if we want the current DAG to check the upstream DAG from previous date.
waiting_for_upstream = ExternalTaskSensor(
task_id='waiting_for_upstream',
external_dag_id='medium_datetime_sensor',
# None for DAG finish, Task_id for specific task
external_task_id=None,
# Input of function is DAG execution date (pendulum datetime)
execution_date_fn=(lambda dt : dt.add(days= -1))
)
Idempotent SensorOperator
Note
From my previous article about Idempotent DAG HERE.
We also want our SensorOperator to has an Idempotent behavior too. That could be done with the same template method as the previous article.
This is an example of a simple DAG with Idempotent FileSensor and Idempotent DateTimeSensor. It will create a DAG which apply the Idempotent concept into sensors.
from airflow.models import DAG
from airflow.sensors.date_time import DateTimeSensor
from airflow.sensors.filesystem import FileSensor
from airflow.operators.empty import EmptyOperator
from pendulum import datetime, now
with DAG(
dag_id='medium_idempotent_sensor',
schedule='@daily',
catchup=False,
start_date=datetime(2024,3,1),
max_active_runs=1
):
start = EmptyOperator(task_id='start')
waiting_for_datetime= DateTimeSensor(
task_id='waiting_for_datetime',
target_time='{{ data_interval_end.in_tz("Asia/Bangkok").replace(hour= 23) }}'
)
waiting_for_file= FileSensor(
task_id='waiting_for_file',
# File name is hello_YYYYMMDD.txt
filepath='/home/hello_{{ data_interval_end.in_tz("Asia/Bangkok").strftime("%Y%m%d") }}.txt',
)
end = EmptyOperator(task_id= 'end')
start >> [waiting_for_datetime, waiting_for_file] >> end
Options
PokeInterval & Timeout
Every SensorOperators are built-in with these parameters.
1) poke_interval
: After check, how long should the Sensor wait before check again.
2) timeout
: How long can this Sensor wait before raise an error.
Here is the sample of these parameters.
FileSensor(
task_id='waiting_for_file',
filepath='/home/hello.txt',
poke_interval=30, # Check every 30 seconds
timeout=3600 # After 1st poke, will wait for 1 hour before raise an error
)
Modes
Mode is the behavior of the Sensor during the poke_interval, there are 3 different modes.
1) poke
: Sensor will be active, it’s fast but it will consume resources.
2) reschedule
: Sensor will be inactive, slower but consume less resources.
3) deferrable
: Consume even less resource and even slower than reschedule.
(Don’t forget to airflow triggerer
before use deferrable.)
Note
Deferrable is more complicate than poke and reschedule. If you want to understand how it works, I suggest taking this free course from Astronomer: Airflow: Deferrable Operators (astronomer.io)
Here is the example of how to use each mode with DateTimeSensor.
from airflow.models import DAG
from airflow.sensors.date_time import DateTimeSensor, DateTimeSensorAsync
from pendulum import datetime, now
with DAG(
dag_id='medium_poke',
schedule='@daily',
catchup=True,
start_date=datetime(2024,1,1),
):
poke = DateTimeSensor(
task_id='waiting_for_datetime',
target_time="{{ data_interval_end.add(years= 1) }}",
mode='poke'
)
with DAG(
dag_id='medium_reschedule',
schedule='@daily',
catchup=True,
start_date=datetime(2024,1,1),
):
reschedule = DateTimeSensor(
task_id='waiting_for_datetime',
target_time="{{ data_interval_end.add(years= 1) }}",
mode='reschedule'
)
with DAG(
dag_id='medium_deferrable',
schedule='@daily',
catchup=True,
start_date=datetime(2024,1,1),
):
deferrable = DateTimeSensorAsync(
task_id='waiting_for_datetime',
target_time="{{ data_interval_end.add(years= 1) }}"
)