Lightening backfill airflow

I am just starting out with Airbnb airflow and I still don't understand how to do it / when the bay is running.

In particular, there are two use cases that confuse me:

  • If I run the airflow scheduler in a few minutes, stop it for a minute, then restart it again, my DAG seems to start additional tasks for the first 30 seconds or so, then it continues as usual (runs every 10 seconds). Are these additional tasks β€œbombarded” with tasks that were not completed in an earlier mode? If so, how would I tell the airflow not to perform these tasks?

  • If I run airflow scheduler in a few minutes, run airflow clear MY_tutorial and then restart airflow scheduler , it seems to start TON of additional tasks. Are these tasks somehow "bombarded" with tasks? Or am I missing something.

I currently have a very simple dag:

 default_args = { 'owner': 'me', 'depends_on_past': False, 'start_date': datetime(2016, 10, 4), 'email': [' airflow@airflow.com '], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } dag = DAG( 'MY_tutorial', default_args=default_args, schedule_interval=timedelta(seconds=10)) # t1, t2 and t3 are examples of tasks created by instantiating operators t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag) t2 = BashOperator( task_id='sleep', bash_command='sleep 5', retries=3, dag=dag) templated_command = """ {% for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 8)}}" echo "{{ params.my_param }}" {% endfor %} """ t3 = BashOperator( task_id='templated', bash_command=templated_command, params={'my_param': 'Parameter I passed in'}, dag=dag) second_template = """ touch ~/airflow/logs/test echo $(date) >> ~/airflow/logs/test """ t4 = BashOperator( task_id='write_test', bash_command=second_template, dag=dag) t1.set_upstream(t4) t2.set_upstream(t1) t3.set_upstream(t1) 

The only two things I changed in my airflow configuration:

  • I switched from using sqlite db to using postgres db
  • I am using CeleryExecutor instead of SequentialExecutor

Thank you for help!

+5
source share
2 answers

When the scheduler switches to "on" for the DAG, the scheduler will backfill all dag startup instances for which it is not registered in the status, starting with the initial_data specified in "default_args".

For example: if the start date was "2017-01-21", and you turned on the scheduling switch to "2017-01-22T00: 00: 00", and your dag was configured to run hourly, then the scheduler will backfill 24 dag starts, and then runs at the scheduled interval.

This is essentially what happens in both of your questions. At # 1, it fills 3 missed runs with 30 seconds that you disabled the scheduler. In # 2, it populates all DAG commands from start_date to "now".

There are two ways around this:

  • Set start_date to a date in the future, so that it will only begin to plan to start dag after reaching that date. Please note: if you change the start_dag of the DAG, you must also change the name of the DAG due to how the start date is stored in the airflow DB.
  • Manually start backfilling from the command line using the β€œ-m” flag, which tells the air stream not to actually execute the DAG, but simply mark it as successful in the database ( https://airflow.incubator.apache.org/cli.html ).

    eg. airflow backfill MY_tutorial -m -s 2016-10-04 -e 2017-01-22T14:28:30

+16
source

On off. in the user interface, the airflow indicates only β€œPAUSE”, which means that if it is turned on, it will be suspended only after it has been started and will continue this date if it is turned off.

0
source

Source: https://habr.com/ru/post/1257748/


All Articles