Airflow - Python NOT file in one DAG folder

I am trying to use Airflow to perform a simple python task.

from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta


from pprint import pprint

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                  datetime.min.time())

args = {
    'owner': 'airflow',
    'start_date': seven_days_ago,
}

dag = DAG(dag_id='python_test', default_args=args)


def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'

run_this = PythonOperator(
    task_id='print',
    provide_context=True,
    python_callable=print_context,
    dag=dag)

If I try, for example:

python_test print airflow test 2015-01-01

It works!

Now I want to put my function def print_context(ds, **kwargs)in another python file. So I create an antoher file with the name: simple_test.py and modify:

run_this = PythonOperator(
    task_id='print',
    provide_context=True,
    python_callable=simple_test.print_context,
    dag=dag)

Now I'm trying to run again:

python_test print airflow test 2015-01-01

And OK! It still works!

But if I create a module, for example, a working module with a file SimplePython.py, import ( from worker import SimplePython) it and try:

python_test print airflow test 2015-01-01

It gives a message:

ImportError: no module named worker

Questions:

  • Can I import a module inside a DAG definition?
  • How is Airflow + Celery going to distribute all the necessary python source files on work nodes?
+16
4

:

https://airflow.apache.org/concepts.html#packaged-dags

, zip , dag zip . , ZIP , :

my_dag1.py
my_dag2.py
package1/__init__.py
package1/functions.py

Airflow zip my_dag1.py my_dag2.py. , .

CeleryExecutor DAG, Airflow :

https://airflow.apache.org/configuration.html?highlight=scaling%20out%20celery#scaling-out-with-celery

DAGS_FOLDER,

+11

zip, , , , , dags. , dags , puppet & .

, dags python:

โ””โ”€โ”€ airflow/dags  # root airflow dags folder where all dags live
    โ””โ”€โ”€ my_dags  # git repo project root
        โ”œโ”€โ”€ my_dags  # python src root (usually named same as project)
        โ”‚   โ”œโ”€โ”€ my_test_globals.py  # file I want to import
        โ”‚   โ”œโ”€โ”€ dag_in_package.py 
        โ”‚   โ””โ”€โ”€ dags 
        โ”‚        โ””โ”€โ”€ dag_in_subpackage.py
        โ”œโ”€โ”€ README.md  # also setup.py, LICENSE, etc here
        โ””โ”€โ”€ dag_in_project_root.py

( [1]) __init__.py . . . , . my_test_globals :

from my_dags.my_dags import my_test_globals

, , python, dags, dags python. , . , :

โ””โ”€โ”€ airflow/dags  # root airflow dags folder where all dags live
    โ””โ”€โ”€ my_dags  # git repo project root & python src root
        โ”œโ”€โ”€ my_test_globals.py  # file I want to import
        โ”œโ”€โ”€ dag_in_package.py 
        โ”œโ”€โ”€ dags 
        โ”‚    โ””โ”€โ”€ dag_in_subpackage.py
        โ”œโ”€โ”€ README.md  # also setup.py, LICENSE, etc here
        โ””โ”€โ”€ dag_in_project_root.py

, :

from my_dags import my_test_globals
+4

.

, __init__.py SimplePython.py ( worker ). worker python.

DAG from worker.SimplePython import print_context.

, , , , , .

+1

: Airflow + Celery python ?

: DAGS_FOLDER, . DAGS_FOLDER Git Chef, Puppet, Ansible , . , .

http://pythonhosted.org/airflow/installation.html?highlight=chef

+1

Source: https://habr.com/ru/post/1614358/


All Articles