In Airflow, I ran into a problem that I need to pass job_flow_id to one of my emr steps. I can get job_flow_id from the operator, but when I am going to create steps to send to the cluster, the task_instance value task_instance incorrect. I have the following code:
def issue_step(name, args): return [ { "Name": name, "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "s3://....", "Args": args } } ] dag = DAG('example', description='My dag', schedule_interval='0 8 * * 6', dagrun_timeout=timedelta(days=2)) try: create_emr = EmrCreateJobFlowOperator( task_id='create_job_flow', aws_conn_id='aws_default', dag=dag ) load_data_steps = issue_step('load', ['arg1', 'arg2']) load_data_steps[0]["HadoopJarStep"]["Args"].append('--cluster-id') load_data_steps[0]["HadoopJarStep"]["Args"].append( "{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}")
The problem is that when I check EMR, instead of seeing --cluster-id j-1234 in the load_data step, I see --cluster-id "{{task_instance.xcom_pull('create_job_flow', key='return_value')}}" , which causes my step to fail.
How can I get the actual value inside my step function?
Thank you and happy holidays.
source share