I am currently trying to use Airflow to organize a process where some operators are defined dynamically and depend on the output of another (earlier) operator.
In the code below, t1 updates the text file with the new records (they are actually read from the external queue, but for simplicity I hardcoded them as A, B and C here). Then I want to create separate statements for each record read from this text file. These operators will create directories A, B and C, respectively, and in the Airflow user interface will be considered as separate bash processes Create_directory_A, Create_directory_B and Create_directory_C.
dag = DAG('Test_DAG', description="Lorem ipsum.", start_date=datetime(2017, 3, 20), schedule_interval=None, catchup=False) def create_text_file(list_of_rows): text_file = open('text_file.txt', "w") for row in list_of_rows: text_file.write(row + '\n') text_file.close() def read_text(): txt_file = open('text_file.txt', 'r') return [element for element in txt_file.readlines()] t1 = PythonOperator( task_id='Create_text_file', python_callable=create_text_file, op_args=[['A', 'B', 'C']], dag=dag ) for row in read_text(): t2 = BashOperator( task_id='Create_directory_{}'.format(row), bash_command="mkdir {{params.dir_name}}", params={'dir_name': row}, dag=dag ) t1 >> t2
In the Airflow Documentation, I see that the scheduler will periodically execute it [DAG] to reflect the changes, if any. Does this mean that there is a risk that although my t1 statement is executed before t2, bash statements are created for the list of records before updating (as was the case with the DAG evaluation)?
Dawid source share