Airflow Executor & Friends: How Actually Does the Executor Run the Task?

7 minute read

Published:

A few days ago I did a small experiment with Airflow. To be precise, scheduling Airflow to run a Spark job via spark-submit to a standalone cluster. I have actually mentioned briefly about how to create a DAG and Operators in the previous post.

Here’s a brief scenario used in the experiment.

  • Submit a PySpark Job to a standalone cluster with client deploy mode
  • Submit a PySpark Job to a standalone cluster with cluster deploy mode

Setting up a Spark standalone cluster in your machine is simple. You can find how to do that in the documentation. If you want to check the Spark master’s dashboard (served in port 8080 by default), you should use different port as Airflow webserver already uses that port. Just add the parameter --webui-port <port> after ./sbin/start-master.sh.

If you noticed, currently Spark job in Python doesn’t support cluster deploy mode. Passing this mode as a spark-submit parameter will result in below exception.

org.apache.spark.SparkException: Cluster deploy mode is currently not supported for python applications on standalone clusters.

According to above fact, the second experiment should be a failed task.

After completely running both tasks, the following notification was returned for both tasks.

INFO - Executor reports execution of <DAG_ID>.<Task_ID> execution_date=<execution_date> exited with status success for try_number <try_num>

What made me confused was that the fact that the execution of both tasks returned success status. What differred both of them was that the first experiment has SUCCESS status for the TaskInstance, while the second experiment has UP_FOR_RETRY status. This UP_FOR_RETRY requests the Scheduler for another retry for this second experiment.

Even though the status of TaskInstance for both experiments are different, the Executor reports the same status –that is SUCCESS– for both experiments.

So I wondered, what did this report by the Executor mean?

I decided to find out the answer by performing the most boring and challenging task in software engineering - reading the code base.

You can find the Airflow’s code base here.

I’m going to make this post brief actually. Here’s what I found from my quick investigation.

NB: I used Sequential Executor when performing this investigation.

How Airflow Executes a Task?

(A) The Scheduler Talks to the Executor

The Scheduler basically does the following after finding out what Tasks can be executed.

  • Sends out the information regarding the task to the Executor (DAG ID, Task ID, execution date, try number, priority for execution, and queue mode)
  • Generates the command to run the task
  • Specifies the priority to run the task & queue mode
  • Asks the Executor to add the generated command, task priority, queue mode, and simple task instance to a queue (let's call this as TaskInfo)

You can find an example of the above work by inspecting the Scheduler log.

{scheduler_job.py:1148} INFO - Sending (DAG_ID, Task_ID, execution_date, try_number) to executor with priority <priority> and queue <queue_mode>

You can read the code here.

(B) The Executor Performs the Scheduler's Request

The Executor adds the Task Info from the Scheduler to a queue.

You can find the above work on the Scheduler’s log.

{base_executor.py:59} INFO - Adding to queue: ['airflow', 'run', DAG_ID, Task_ID, execution_date, '--local', '--pool', 'default_pool', '-sd', path/to/dag/file]

You can find the method used by the Executor to add the Task Info to the queue here.

(C) The Scheduler Asks the Executor to Run the Task

The Scheduler sends heartbeat to the Executor. You can find the code here.

The heartbeat makes the Executor triggered to run the task. Please refer to the code.

Before running the task, the Executor sorts the queued Task Info according to the task priority in descending order. I think bigger value should denote more priority in this case. Please refer to the code.

The Executor adds all the queued Task Info to a list. However, based on the source code, only key & command to run the task are added. The key should has the following form: dag_id, task_id, execution_date, try_number. Please refer to the code.

(D) The Executor Starts to Run the Task

The Executor calls the sync method to execute the command used to run the task. Please refer to the code.

The Scheduler waits for notification from the Executor regarding whether the task has finished.

(E) The Executor Asks Other Layers For Help

The Executor executes the task by calling the Local Task Job.

The Local Task Job retrieves the Task Runner specified in the configuration file (airflow.cfg). Please refer to the code. Below is the example of the configuration parameter.

# The class to use for running task instances in a subprocess
task_runner = StandardTaskRunner

When the Task Runner is retrieved, the parent class __init__ method is executed. This __init__ creates another command used to run the task. This command is made to be able to run in different environment. The common use case for this might be when we use distributed workers (?). CMIIW. Please refer to the code.

The StandardTaskRunner inherits BaseTaskRunner.

The Local Task Job starts the Task Runner. Please refer to the code.

(F) The Task Runner Executes the Operator

The Task Runner finally executes the command used to run the task (full_cmd). Please refer to the code.

The full_cmd is used to execute the run() method in TaskInstance.py. Please refer to the code.

The run() method checks the dependencies (code).

If the dependencies are met, then execute the Operator via its execute() method. Please refer to the code.

(G) The Operator in Action

The Operator executes the task implementation. Please refer to the code.

After the Operator has finished (Local Task Job got the return code from the Task Runner), the Task Runner is then terminated.

(H) The Local Task Job Reports Back to the Executor

The Executor changes the state of the Task Instance. You can find it by inspecting the Scheduler log.

Changing state: key = (DAG_ID, Task_ID, execution_datetime, try_number)

Please refer to the code.


The Conclusion

Whether the Task Instance was successful or not, if there’s no subprocess.CalledProcessError exception, the STATE reported by the Executor to the Scheduler will be SUCCESS.

According to the source code (here), CalledProcessError exception means that the task was failed to execute till finish. This does not relate to the return code returned by the Task Instance.

The Executor does not consider the return code of the Task Instance because there was no process of evaluating the return code during the workflow (from A to H). Although the return code is retrieved as the sign to terminate the Local Task Job (code), there’s no any further process for the return code, such as reporting it back to the Executor. Therefore, this should conclude that the Executor does not receive any information regarding the return code of the Task Instance.

In my opinion, since the termination of the Local Task Job denotes that the Task Instance has finished (got the return code), the Executor presumes that the Task Instance has completed already. It does not care about the completion status of the Task Instance. What it cares is that the Task Instance has been executed by all the layers below the Executor, such as the Local Task Job, Task Runner, and Operator. There’s no any exception returned by any layer meaning that the task has been performed smoothly. That’s why the status reported to the Scheduler is success.

Last but not least, an example of case when CalledProcessError exception happens can be found here.


Thank you for reading.

I really appreciated any feedback.