Airflow - Task Instance in the EMR Statement

In Airflow, I ran into a problem that I need to pass job_flow_id to one of my emr steps. I can get job_flow_id from the operator, but when I am going to create steps to send to the cluster, the task_instance value task_instance incorrect. I have the following code:

 def issue_step(name, args): return [ { "Name": name, "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "s3://....", "Args": args } } ] dag = DAG('example', description='My dag', schedule_interval='0 8 * * 6', dagrun_timeout=timedelta(days=2)) try: create_emr = EmrCreateJobFlowOperator( task_id='create_job_flow', aws_conn_id='aws_default', dag=dag ) load_data_steps = issue_step('load', ['arg1', 'arg2']) load_data_steps[0]["HadoopJarStep"]["Args"].append('--cluster-id') load_data_steps[0]["HadoopJarStep"]["Args"].append( "{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}") # the value here is not exchanged with the actual job_flow_id load_data = EmrAddStepsOperator( task_id='load_data', job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", # this is correctly exchanged with the job_flow_id - same for the others aws_conn_id='aws_default', steps=load_data_steps, dag=dag ) check_load_data = EmrStepSensor( task_id='watch_load_data', job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", step_id="{{ task_instance.xcom_pull('load_data', key='return_value')[0] }}", aws_conn_id='aws_default', dag=dag ) cluster_remover = EmrTerminateJobFlowOperator( task_id='remove_cluster', job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}", aws_conn_id='aws_default', dag=dag ) create_emr_recommendations >> load_data load_data >> check_load_data check_load_data >> cluster_remover except AirflowException as ae: print ae.message 

The problem is that when I check EMR, instead of seeing --cluster-id j-1234 in the load_data step, I see --cluster-id "{{task_instance.xcom_pull('create_job_flow', key='return_value')}}" , which causes my step to fail.

How can I get the actual value inside my step function?

Thank you and happy holidays.

+5
source share
1 answer

I found out that there is a PR in the airflow repository about this . The problem is that there are no templates for EmrAddStepsOperator templates. To overcome this problem, I did the following:

  • Created a custom statement that inherits from EmrAddStepsOperator
  • Added this operator as a plugin
  • Called by a new statement in my DAG file

Here is the code for the custom operator and the plugin in the file custom_emr_add_step_operator.py (see below)

 from __future__ import division, absolute_import, print_function from airflow.plugins_manager import AirflowPlugin from airflow.utils import apply_defaults from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator class CustomEmrAddStepsOperator(EmrAddStepsOperator): template_fields = ['job_flow_id', 'steps'] # override with steps to solve the issue above @apply_defaults def __init__( self, *args, **kwargs): super(CustomEmrAddStepsOperator, self).__init__(*args, **kwargs) def execute(self, context): super(CustomEmrAddStepsOperator, self).execute(context=context) # Defining the plugin class class CustomPlugin(AirflowPlugin): name = "custom_plugin" operators = [CustomEmrAddStepsOperator] 

In my DAG file, I named this plugin this way

 from airflow.operators import CustomEmrAddStepsOperator 

The structure of my project and plugins is as follows:

 ├── config │  └── airflow.cfg ├── dags │  ├── __init__.py │  └── my_dag.py ├── plugins │  ├── __init__.py │  └── operators │  ├── __init__.py │  └── custom_emr_add_step_operator.py └── requirements.txt 

If you are using an IDE such as PyCharm, this will complain because it says that it cannot find the module. But when you start Airflow, this problem will not appear. Remember also to make sure you specify the plugins folder you need in your airflow.cfg so that Airflow can read your newly created plugin.

+3
source

Source: https://habr.com/ru/post/1274303/