Airflow Executor & Friends: How Actually Does the Executor Run the Task?
Published:
A few days ago I did a small experiment with Airflow. To be precise, scheduling Airflow to run a Spark job via spark-submit
to a standalone cluster. I have actually mentioned briefly about how to create a DAG and Operators in the previous post.
Here’s a brief scenario used in the experiment.
- Submit a PySpark Job to a standalone cluster with client deploy mode
- Submit a PySpark Job to a standalone cluster with cluster deploy mode
Setting up a Spark standalone cluster in your machine is simple. You can find how to do that in the documentation. If you want to check the Spark master’s dashboard (served in port 8080 by default), you should use different port as Airflow webserver already uses that port. Just add the parameter --webui-port <port>
after ./sbin/start-master.sh
.
If you noticed, currently Spark job in Python doesn’t support cluster deploy mode. Passing this mode as a spark-submit
parameter will result in below exception.
org.apache.spark.SparkException: Cluster deploy mode is currently not supported for python applications on standalone clusters.
According to above fact, the second experiment should be a failed task.
After completely running both tasks, the following notification was returned for both tasks.
INFO - Executor reports execution of <DAG_ID>.<Task_ID> execution_date=<execution_date> exited with status success for try_number <try_num>
What made me confused was that the fact that the execution of both tasks returned success status. What differred both of them was that the first experiment has SUCCESS status for the TaskInstance
, while the second experiment has UP_FOR_RETRY status. This UP_FOR_RETRY requests the Scheduler
for another retry for this second experiment.
Even though the status of TaskInstance
for both experiments are different, the Executor
reports the same status –that is SUCCESS– for both experiments.
So I wondered, what did this report by the Executor
mean?
I decided to find out the answer by performing the most boring and challenging task in software engineering - reading the code base.
You can find the Airflow’s code base here.
I’m going to make this post brief actually. Here’s what I found from my quick investigation.
NB: I used Sequential Executor when performing this investigation.
How Airflow Executes a Task?
(A) The Scheduler Talks to the Executor
The Scheduler
basically does the following after finding out what Tasks
can be executed.
- Sends out the information regarding the task to the Executor (DAG ID, Task ID, execution date, try number, priority for execution, and queue mode)
- Generates the command to run the task
- Specifies the priority to run the task & queue mode
- Asks the Executor to add the generated command, task priority, queue mode, and simple task instance to a queue (let's call this as TaskInfo)
You can find an example of the above work by inspecting the Scheduler
log.
{scheduler_job.py:1148} INFO - Sending (DAG_ID, Task_ID, execution_date, try_number) to executor with priority <priority> and queue <queue_mode>
You can read the code here.
(B) The Executor Performs the Scheduler's Request
The Executor
adds the Task Info from the Scheduler
to a queue.
You can find the above work on the Scheduler
’s log.
{base_executor.py:59} INFO - Adding to queue: ['airflow', 'run', DAG_ID, Task_ID, execution_date, '--local', '--pool', 'default_pool', '-sd', path/to/dag/file]
You can find the method used by the Executor
to add the Task Info to the queue here.
(C) The Scheduler Asks the Executor to Run the Task
The Scheduler
sends heartbeat to the Executor
. You can find the code here.
The heartbeat makes the Executor
triggered to run the task. Please refer to the code.
Before running the task, the Executor
sorts the queued Task Info according to the task priority in descending order. I think bigger value should denote more priority in this case. Please refer to the code.
The Executor
adds all the queued Task Info to a list. However, based on the source code, only key & command to run the task are added. The key should has the following form: dag_id, task_id, execution_date, try_number. Please refer to the code.
(D) The Executor Starts to Run the Task
The Executor
calls the sync method to execute the command used to run the task. Please refer to the code.
The Scheduler
waits for notification from the Executor
regarding whether the task has finished.
(E) The Executor Asks Other Layers For Help
The Executor
executes the task by calling the Local Task Job
.
The Local Task Job
retrieves the Task Runner
specified in the configuration file (airflow.cfg). Please refer to the code. Below is the example of the configuration parameter.
# The class to use for running task instances in a subprocess
task_runner = StandardTaskRunner
When the Task Runner
is retrieved, the parent class __init__
method is executed. This __init__
creates another command used to run the task. This command is made to be able to run in different environment. The common use case for this might be when we use distributed workers (?). CMIIW. Please refer to the code.
The StandardTaskRunner
inherits BaseTaskRunner
.
The Local Task Job
starts the Task Runner
. Please refer to the code.
(F) The Task Runner Executes the Operator
The Task Runner
finally executes the command used to run the task (full_cmd
). Please refer to the code.
The full_cmd
is used to execute the run()
method in TaskInstance.py. Please refer to the code.
The run()
method checks the dependencies (code).
If the dependencies are met, then execute the Operator
via its execute()
method. Please refer to the code.
(G) The Operator in Action
The Operator
executes the task implementation. Please refer to the code.
After the Operator
has finished (Local Task Job
got the return code from the Task Runner
), the Task Runner
is then terminated.
(H) The Local Task Job Reports Back to the Executor
The Executor
changes the state of the Task Instance
. You can find it by inspecting the Scheduler
log.
Changing state: key = (DAG_ID, Task_ID, execution_datetime, try_number)
Please refer to the code.
The Conclusion
Whether the Task Instance
was successful or not, if there’s no subprocess.CalledProcessError
exception, the STATE reported by the Executor
to the Scheduler
will be SUCCESS.
According to the source code (here), CalledProcessError
exception means that the task was failed to execute till finish. This does not relate to the return code returned by the Task Instance
.
The Executor
does not consider the return code of the Task Instance
because there was no process of evaluating the return code during the workflow (from A to H). Although the return code is retrieved as the sign to terminate the Local Task Job
(code), there’s no any further process for the return code, such as reporting it back to the Executor
. Therefore, this should conclude that the Executor
does not receive any information regarding the return code of the Task Instance
.
In my opinion, since the termination of the Local Task Job
denotes that the Task Instance
has finished (got the return code), the Executor
presumes that the Task Instance
has completed already. It does not care about the completion status of the Task Instance
. What it cares is that the Task Instance
has been executed by all the layers below the Executor
, such as the Local Task Job
, Task Runner
, and Operator
. There’s no any exception returned by any layer meaning that the task has been performed smoothly. That’s why the status reported to the Scheduler
is success.
Last but not least, an example of case when CalledProcessError
exception happens can be found here.
Thank you for reading.
I really appreciated any feedback.