Airflow: Transmit Dynamic Value to Sub DAG Operator

I am new to Airflow.
I came across a scenario in which the parent DAG needs to pass some dynamic number (let n) into the Sub DAG.
Where this number will be used as SubDAG for the dynamic creation of parallel tasks n.

Airflow documentation does not cover how to achieve this. So I explored a couple of ways:

Option - 1 (using xcom Pull)

I tried to pass the xcom value, but for some reason SubDAG does not allow the passed value.

Parent Dag File

def load_dag(**kwargs):
    number_of_runs = json.dumps(kwargs['dag_run'].conf['number_of_runs'])
    dag_data = json.dumps({
        "number_of_runs": number_of_runs
    })
    return dag_data

# ------------------ Tasks ------------------------------
load_config = PythonOperator(
    task_id='load_config',
    provide_context=True,
    python_callable=load_dag,
    dag=dag)


t1 = SubDagOperator(
    task_id=CHILD_DAG_NAME,
    subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, "'{{ ti.xcom_pull(task_ids='load_config') }}'" ),
    default_args=default_args,
    dag=dag,
)

Sub Dag File

def sub_dag(parent_dag_name, child_dag_name, args, num_of_runs):
    dag_subdag = DAG(
        dag_id='%s.%s' % (parent_dag_name, child_dag_name),
        default_args=args,
        schedule_interval=None)

    variabe_names = {}

    for i in range(num_of_runs):
        variabe_names['task' + str(i + 1)] =  DummyOperator(
        task_id='dummy_task',
        dag=dag_subdag,
    )

    return dag_subdag

Option - 2

I also tried passing number_of_runsas a global variable that did not work.

Option - 3

. sub DAG File doesn't exist error. , , .

- .

+6
4

3. dag , . , load_config , . factory :

def subdag(...):
    sdag = DAG('%s.%s' % (parent, child), default_args=args, schedule_interval=timedelta(hours=1))
    file_path = "/path/to/generated/file"
    if os.path.exists(file_path):
        data_file = open(file_path)
        list_tasks = data_file.readlines()
        for task in list_tasks:
            DummyOperator(
                  task_id='task_'+task,
                  default_args=args,
                  dag=sdag,
            )
    return sdag

. dag load_config

+2

1 , xcom_pull, dag_id . xcom_pull task_id 'load_config' , .

x_com :

subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, "'{{ ti.xcom_pull(task_ids='load_config', dag_id='" + PARENT_DAG_NAME + "' }}'" ),
+1

, , (, ), Jaime :

file_path = "/path/to/generated/file"

, , , , . .

0
source

Look at my answer here , in which I will describe a way to dynamically create a task based on the results of a previously completed task using xcoms and subdags.

0
source

Source: https://habr.com/ru/post/1678540/


All Articles