La circulation de l'air - fichier Python PAS dans la même DAG dossier
Je suis en train d'utiliser le flux d'Air à l'exécution d'une tâche simple en python.
from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
from pprint import pprint
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
datetime.min.time())
args = {
'owner': 'airflow',
'start_date': seven_days_ago,
}
dag = DAG(dag_id='python_test', default_args=args)
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=print_context,
dag=dag)
Si j'essaie, par exemple:
la circulation de l'air test python_test impression 2015-01-01
Ça marche!!!
Maintenant, je veux mettre mon def print_context(ds, **kwargs)
fonction dans d'autres fichier python. J'ai donc créer antoher fichier appelé: simple_test.py et le changement:
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=simple_test.print_context,
dag=dag)
Maintenant, j'essaie de courir à nouveau:
la circulation de l'air test python_test impression 2015-01-01
Et OK! Il encore du travail!
Mais si je crée un module, par exemple, un travailleur module avec fichier SimplePython.py
, à l'importation (from worker import SimplePython
)et de l'essayer:
la circulation de l'air test python_test impression 2015-01-01
Il donne le message :
ImportError: No module named travailleur
Les questions:
- Est-il possible d'importer un module à l'intérieur d'un DAG définition?
- Comment la circulation de l'Air+Céleri va distribuer tout le nécessaire python sources des fichiers sur les nœuds de travail?
OriginalL'auteur p.magalhaes | 2015-11-03
Vous devez vous connecter pour publier un commentaire.
Vous pouvez les dépendances de package de votre DAG par:
https://airflow.apache.org/concepts.html#packaged-dags
Lors de l'utilisation de CeleryExecutor, vous avez besoin pour synchroniser manuellement DAG répertoires, le débit d'Air n'est pas prendre soin de cela pour vous:
https://airflow.apache.org/configuration.html?highlight=scaling%20out%20celery#scaling-out-with-celery
OriginalL'auteur ImDarrenG
Pour votre première question, c'est possible.
Et je pense que vous devriez créer un fichier vide nommé
__init__.py
dans le même répertoire avecSimplePython.py
(C'estworker
répertoire dans votre cas). En faisant celaworker
répertoire sera considéré comme un module python.Puis dans votre DAG définition, essayez
from worker.SimplePython import print_context
.Dans votre cas, je suppose que ce serait mieux si vous écrivez un plugin pour la circulation de l'air, car vous risquez de vous souhaitez mettre à niveau la circulation de l'air de projet de base sans enlever vos fonctions sur mesure.
OriginalL'auteur Yongyiw
Pour votre deuxième question : Comment la circulation de l'Air+Céleri va distribuer tout le nécessaire python sources des fichiers sur les nœuds de travail?
À partir de la documentation : Le travailleur doit avoir accès à ses DAGS_FOLDER, et vous avez besoin de synchroniser les systèmes de fichiers par vos propre moyen. Une installation commune serait de stocker votre DAGS_FOLDER dans un dépôt Git et la synchronisation entre les machines à l'aide de la Chef, Puppet, Ansible, ou ce que vous utilisez pour configurer les ordinateurs de votre environnement. Si vos boîtes ont en commun un point de montage, d'avoir votre pipelines fichiers partagés, il devrait fonctionner aussi bien
http://pythonhosted.org/airflow/installation.html?highlight=chef
OriginalL'auteur nono
Tandis que l'emballage de votre dags dans un zip abordés dans les docs est la seule solution que j'ai vu, vous pouvez également faire importations de modules qui sont à l'intérieur de la dags dossier. Ceci est utile si vous synchronisez les dags dossier automatiquement en utilisant d'autres outils comme puppet & git.
Je ne suis pas clair sur votre structure de répertoire à partir de la question, voici un exemple dags dossier basé sur un projet de python structure:
J'ai laissé de côté l' ([ Un ])
__init__.py
fichiers. Notez l'emplacement des trois exemple dags. Vous serait presque certainement utiliser un seul de ces endroits pour tous vos dags. J'ai tous les inclure ici pour des raisons d'exemple car il ne devrait pas question pour l'importation. Pour importermy_test_globals
de l'un d'eux:Je crois que cela signifie que la circulation de l'air des charges de chaque sous-répertoire de la dags dossier comme un paquet python. Dans mon cas, il était l'intermédiaire additionnel répertoire racine du projet de monter dans la façon de faire un typique intra-package absolue de l'importation. Ainsi, nous avons pu restructurer cet air de projet comme ceci:
De sorte que les importations regard que nous attendons d'eux:
OriginalL'auteur 7yl4r