I am trying to add custom filters for my jinja2 templates for airflow.
Since my folders in S3 are like
/ year / month / day /
my goal is to use the yesterday function on the Variables screen as follows:
s3: //logs.web.com/AWSLogs/ {{yesterday_ds | get_year}} / {{yesterday_ds | get_month}} / {{yesterday_ds | get_day}} /
I saw in PR (which I think has already been merged ..) that you can do this with the user_defined_filters parameter in the dag_args parameter in the creation of the dag object here
The problem is that even so, he says "there is no filter named get_year", for example.
This is my code:
dag.py
dag = DAG(
dag_id='dag-name',
default_args=utils.get_dag_args(user_defined_filters=utils.get_date_filters()),
template_searchpath=tmpl_search_path,
schedule_interval=timedelta(days=1),
max_active_runs=1,
)
utils.py
def get_dag_args(**kwargs):
return {
'owner' : kwargs.get('owner', 'owner_name'),
'depends_on_past' : kwargs.get('depends_on_past', False),
'start_date' : kwargs.get('start_date', datetime.now() - timedelta(1)),
'email' : kwargs.get('email', ['blabla@blabla.com']),
'retries' : kwargs.get('retries', 5),
'provide_context' : kwargs.get('provide_context', True),
'retry_delay' : kwargs.get('retry_delay', timedelta(minutes=5)),
'user_defined_filters': get_date_filters()
}
def get_date_filters():
return dict(
get_year=lambda date_string: date_string.strftime('%Y'),
get_month=lambda date_string: date_string.strftime('%m'),
get_day=lambda date_string: date_string.strftime('%d'),
)
Does anyone see where I'm wrong? Thank!
EDIT
dag, , : (.
jinja_env = dag.get_template_env()
print(jinja_env.filters)
, DAG, @tests/models.py:
Broken DAG: [/home/ubuntu/airflow/dags/dag.py] __init__() got an unexpected keyword argument 'user_defined_filters'
2
, , 1.8.0, . - , 1.8.2rc pip? ?