How to use xcoms airflow with MySqlOperator

def mysql_operator_test(): DEFAULT_DATE = datetime(2017, 10, 9) t = MySqlOperator( task_id='basic_mysql', sql="SELECT count(*) from table 1 where id>100;", mysql_conn_id='mysql_default', dag=dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=False) run_this = PythonOperator( task_id='getRecoReq', python_callable=mysql_operator_test, # xcom_push=True, dag=dag) task2 = PythonOperator( task_id= 'mysql_select', provide_context=True, python_callable = blah, templates_dict = {'requests': "{{ ti.xcom_pull(task_ids='getReq') }}" }, dag=dag) run_this.set_downstream(task2) 

I want to capture the invoice returned by MySqlOperator using xcoms. Can anyone consult on the same?

+5
source share
1 answer

You are very close! However, the way you ask this question is a kind of anti-pattern. You do not want to share task data in Airflow. In addition, you do not want to use a statement, as in mysql_operator_test . It is tempting, I did the same when I started.

I tried something very similar to this, but with SFTP connections. In the end, I just did everything inside PythonOperator and used basic hooks.

I would recommend using MySQLHook inside python_callable . Something like that:

 def count_mysql_and_then_use_the_count(): """ Returns an SFTP connection created using the SSHHook """ mysql_hook = MySQLHook(...) cur = conn.cursor() cur.execute("""SELECT count(*) from table 1 where id>100""") for count in cur: # Do something with the count... 

I'm not sure if this will work, but the idea is that you use a hook inside your Python, I often use MySQLHook , but I did it with SSHHook and it works great.

+1
source

Source: https://habr.com/ru/post/1272504/


All Articles