I am new to Airflow.
I came across a scenario in which the parent DAG needs to pass some dynamic number (let n) into the Sub DAG.
Where this number will be used as SubDAG for the dynamic creation of parallel tasks n.
Airflow documentation does not cover how to achieve this. So I explored a couple of ways:
Option - 1 (using xcom Pull)
I tried to pass the xcom value, but for some reason SubDAG does not allow the passed value.
Parent Dag File
def load_dag(**kwargs):
number_of_runs = json.dumps(kwargs['dag_run'].conf['number_of_runs'])
dag_data = json.dumps({
"number_of_runs": number_of_runs
})
return dag_data
load_config = PythonOperator(
task_id='load_config',
provide_context=True,
python_callable=load_dag,
dag=dag)
t1 = SubDagOperator(
task_id=CHILD_DAG_NAME,
subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, "'{{ ti.xcom_pull(task_ids='load_config') }}'" ),
default_args=default_args,
dag=dag,
)
Sub Dag File
def sub_dag(parent_dag_name, child_dag_name, args, num_of_runs):
dag_subdag = DAG(
dag_id='%s.%s' % (parent_dag_name, child_dag_name),
default_args=args,
schedule_interval=None)
variabe_names = {}
for i in range(num_of_runs):
variabe_names['task' + str(i + 1)] = DummyOperator(
task_id='dummy_task',
dag=dag_subdag,
)
return dag_subdag
Option - 2
I also tried passing number_of_runsas a global variable that did not work.
Option - 3
. sub DAG File doesn't exist error. , , .
- .