Make Custom Airflow Macros Extend Other Macros

Is there a way to make a custom macro in Airflow that itself computes from other macros?

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

dag = DAG(
    'simple',
    schedule_interval='0 21 * * *',
    user_defined_macros={
        'next_execution_date': '{{ dag.following_schedule(execution_date) }}',
    },
)

task = BashOperator(
    task_id='bash_op',
    bash_command='echo "{{ next_execution_date }}"',
    dag=dag,
)

This uses the reverse porting of the new Airflow v1.8 macro next_execution_dateto work in Airflow v1.7. Unfortunately, this template is displayed without a macro extension:

$ airflow render simple bash_op 2017-08-09 21:00:00
    # ----------------------------------------------------------
    # property: bash_command
    # ----------------------------------------------------------
    echo "{{ dag.following_schedule(execution_date) }}"
+12
source share
2 answers

Here are some solutions:

1. Override BashOperatorto add some values ​​to the context

class NextExecutionDateAwareBashOperator(BashOperator):
    def render_template(self, attr, content, context):
        dag = context['dag']
        execution_date = context['execution_date']
        context['next_execution_date'] = dag.following_schedule(execution_date)

        return super().render_templates(attr, content, context)
        # or in python 2:
        # return super(NextExecutionDateAwareBashOperator, self).render_templates(attr, content, context)

A good part of this approach: you can write some repeating code in your custom statement.

: , .

2.

. .

def compute_next_execution_date(dag, execution_date):
    return dag.following_schedule(execution_date)

dag = DAG(
    'simple',
    schedule_interval='0 21 * * *',
    user_defined_macros={
        'next_execution_date': compute_next_execution_date,
    },
)

task = BashOperator(
    task_id='bash_op',
    bash_command='echo "{{ next_execution_date(dag, execution_date) }}"',
    dag=dag,
)

: , ( XCom, , ..), .

( ): , , , .

3.

( ) , , .

BashOperator(
    task_id='bash_op',
    bash_command='echo "{{ dag.following_schedule(execution_date) }}"',
    dag=dag,
)

, . , (, task, task_instance ..); (, macros.time ,...).

+15

user_defined_macros . user_defined_macro ( params), templating :

class DoubleTemplatedBashOperator(BashOperator):
    def pre_execute(self, context):
        context['ti'].render_templates()

, UDM. , "" .

UDM BashOperator ( ):

BashOperator(
    task_id='bash_op',
    bash_command='echo "{{ dag.following_schedule(execution_date) }}"',
    dag=dag,
)
+3

Source: https://habr.com/ru/post/1680528/


All Articles