La circulation de l'air - fichier Python PAS dans la même DAG dossier

Je suis en train d'utiliser le flux d'Air à l'exécution d'une tâche simple en python.

from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta


from pprint import pprint

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                  datetime.min.time())

args = {
    'owner': 'airflow',
    'start_date': seven_days_ago,
}

dag = DAG(dag_id='python_test', default_args=args)


def print_context(ds, **kwargs):
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'

run_this = PythonOperator(
    task_id='print',
    provide_context=True,
    python_callable=print_context,
    dag=dag)

Si j'essaie, par exemple:

la circulation de l'air test python_test impression 2015-01-01

Ça marche!!!

Maintenant, je veux mettre mon def print_context(ds, **kwargs) fonction dans d'autres fichier python. J'ai donc créer antoher fichier appelé: simple_test.py et le changement:

run_this = PythonOperator(
    task_id='print',
    provide_context=True,
    python_callable=simple_test.print_context,
    dag=dag)

Maintenant, j'essaie de courir à nouveau:

la circulation de l'air test python_test impression 2015-01-01

Et OK! Il encore du travail!

Mais si je crée un module, par exemple, un travailleur module avec fichier SimplePython.py, à l'importation (from worker import SimplePython)et de l'essayer:

la circulation de l'air test python_test impression 2015-01-01

Il donne le message :

ImportError: No module named travailleur

Les questions:

  1. Est-il possible d'importer un module à l'intérieur d'un DAG définition?
  2. Comment la circulation de l'Air+Céleri va distribuer tout le nécessaire python sources des fichiers sur les nœuds de travail?

OriginalL'auteur p.magalhaes | 2015-11-03