Dynamic Task Definition in Airflow

I am currently trying to use Airflow to organize a process where some operators are defined dynamically and depend on the output of another (earlier) operator.

In the code below, t1 updates the text file with the new records (they are actually read from the external queue, but for simplicity I hardcoded them as A, B and C here). Then I want to create separate statements for each record read from this text file. These operators will create directories A, B and C, respectively, and in the Airflow user interface will be considered as separate bash processes Create_directory_A, Create_directory_B and Create_directory_C.

dag = DAG('Test_DAG', description="Lorem ipsum.", start_date=datetime(2017, 3, 20), schedule_interval=None, catchup=False) def create_text_file(list_of_rows): text_file = open('text_file.txt', "w") for row in list_of_rows: text_file.write(row + '\n') text_file.close() def read_text(): txt_file = open('text_file.txt', 'r') return [element for element in txt_file.readlines()] t1 = PythonOperator( task_id='Create_text_file', python_callable=create_text_file, op_args=[['A', 'B', 'C']], dag=dag ) for row in read_text(): t2 = BashOperator( task_id='Create_directory_{}'.format(row), bash_command="mkdir {{params.dir_name}}", params={'dir_name': row}, dag=dag ) t1 >> t2 

In the Airflow Documentation, I see that the scheduler will periodically execute it [DAG] to reflect the changes, if any. Does this mean that there is a risk that although my t1 statement is executed before t2, bash statements are created for the list of records before updating (as was the case with the DAG evaluation)?

+5
source share
2 answers

You cannot dynamically create tasks that depend on the output of the upstream task. You mix schedule and runtime. The DAG definition and job are scheduled. At run time, an instance of the launch and the DAG task is created. Only a task instance can produce output.

Airflow Scheduler will build a dynamic graph with any text_file.txt contained during the schedule. These tasks are then sent to workers.

The employee will eventually execute the instance of task t1 and create a new text_file.txt , but at the moment the list of tasks t2 has already been calculated by the scheduler and sent to the worker.

So, no matter which last instance of t1 is specified in text_file.txt , it will be used the next time the scheduler decides when to start the DAG.

If your task is fast and your employees are not loaded, this will be the content from the previous DAG run. If they are not registered, the contents of text_file.txt may be outdated, and if you are really out of luck, the scheduler reads the file while the task instance writes to it and you get incomplete data from read_text() .

+4
source

This code will actually create one instance of t2 , which will be the bash statement built with the last row that it gets from read_text() . I am sure that this is not what you want.

A better approach would be to create a separate DAG for your t2 statement, which runs when the file is written to t1 . There is such a question that may help: Apache Airflow - start / schedule restart DAG (File Sensor)

+2
source

Source: https://habr.com/ru/post/1275396/


All Articles