Apache Airflow - trigger trigger / DAG restart schedule (File Sensor)

Good morning.

I am also trying to configure DAG

  1. Watch / file meaning to get to the network folder
  2. Process file
  3. Archive file

Using interactive tutorials and threading, I was able to find the following database accessibility group and operator that successfully achieve their goals, however I would like this availability group to be migrated or restarted at the end so that it starts to monitor / detect another file.

I tried to set the variable max_active_runs:1 and then schedule_interval: timedelta(seconds=5) this will transfer the DAG but it starts the queue task and locks the file.

Any ideas are welcome on how I could restart DAG after archive_task?

thanks

DAG code

 from airflow import DAG from airflow.operators import PythonOperator, OmegaFileSensor, ArchiveFileOperator from datetime import datetime, timedelta from airflow.models import Variable default_args = { 'owner': 'glsam', 'depends_on_past': False, 'start_date': datetime.now(), 'provide_context': True, 'retries': 100, 'retry_delay': timedelta(seconds=30), 'max_active_runs': 1, 'schedule_interval': timedelta(seconds=5), } dag = DAG('test_sensing_for_a_file', default_args=default_args) filepath = Variable.get("soucePath_Test") filepattern = Variable.get("filePattern_Test") archivepath = Variable.get("archivePath_Test") sensor_task = OmegaFileSensor( task_id='file_sensor_task', filepath=filepath, filepattern=filepattern, poke_interval=3, dag=dag) def process_file(**context): file_to_process = context['task_instance'].xcom_pull( key='file_name', task_ids='file_sensor_task') file = open(filepath + file_to_process, 'w') file.write('This is a test\n') file.write('of processing the file') file.close() proccess_task = PythonOperator( task_id='process_the_file', python_callable=process_file, provide_context=True, dag=dag ) archive_task = ArchiveFileOperator( task_id='archive_file', filepath=filepath, archivepath=archivepath, dag=dag) sensor_task >> proccess_task >> archive_task 

FILE SENSOR OPERATOR

  import os import re from datetime import datetime from airflow.models import BaseOperator from airflow.plugins_manager import AirflowPlugin from airflow.utils.decorators import apply_defaults from airflow.operators.sensors import BaseSensorOperator class ArchiveFileOperator(BaseOperator): @apply_defaults def __init__(self, filepath, archivepath, *args, **kwargs): super(ArchiveFileOperator, self).__init__(*args, **kwargs) self.filepath = filepath self.archivepath = archivepath def execute(self, context): file_name = context['task_instance'].xcom_pull( 'file_sensor_task', key='file_name') os.rename(self.filepath + file_name, self.archivepath + file_name) class OmegaFileSensor(BaseSensorOperator): @apply_defaults def __init__(self, filepath, filepattern, *args, **kwargs): super(OmegaFileSensor, self).__init__(*args, **kwargs) self.filepath = filepath self.filepattern = filepattern def poke(self, context): full_path = self.filepath file_pattern = re.compile(self.filepattern) directory = os.listdir(full_path) for files in directory: if re.match(file_pattern, files): context['task_instance'].xcom_push('file_name', files) return True return False class OmegaPlugin(AirflowPlugin): name = "omega_plugin" operators = [OmegaFileSensor, ArchiveFileOperator] 
+4
source share
2 answers

Set schedule_interval=None and use the airflow trigger_dag command from BashOperator to start the next execution upon completion of the previous one.

 trigger_next = BashOperator(task_id="trigger_next", bash_command="airflow trigger_dag 'your_dag_id'", dag=dag) sensor_task >> proccess_task >> archive_task >> trigger_next 

You can start your first start manually using the same command airflow trigger_dag , and then trigger_next task will automatically start the next one. We have been using it in production for many months, and it works perfectly.

+3
source

The Dmitris method worked perfectly.

I also found schedule_interval=None in the reading settings, and then using TriggerDagRunOperator it worked the same, and

 trigger = TriggerDagRunOperator( task_id='trigger_dag_RBCPV99_rerun', trigger_dag_id="RBCPV99_v2", dag=dag) sensor_task >> proccess_task >> archive_task >> trigger 
+7
source

Source: https://habr.com/ru/post/1275399/


All Articles