La circulation de l'air à l'aide de fichiers de modèle pour PythonOperator
La méthode de l'obtention d'un BashOperator
ou SqlOperator
pour ramasser un fichier externe pour son modèle est un peu clair et documenté, mais en regardant les PythonOperator
mon test de ce que je comprends de la documentation n'est pas de travail. Je ne suis pas sûr de savoir comment les templates_exts
et templates_dict
paramètres correctement interagissent pour ramasser un fichier.
Dans mon dags dossier que j'ai créé: pyoptemplate.sql
et pyoptemplate.t
ainsi que test_python_operator_template.py
:
pyoptemplate.sql:
SELECT * FROM {{params.table}};
pyoptemplate.t:
SELECT * FROM {{params.table}};
test_python_operator_template.py:
# coding: utf-8
# vim:ai:si:et:sw=4 ts=4 tw=80
"""
# A Test of Templates in PythonOperator
"""
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import pprint
pp = pprint.PrettyPrinter(indent=4)
def templated_function(ds, **kwargs):
"""This function will try to use templates loaded from external files"""
pp.pprint(ds)
pp.pprint(kwargs)
# Define the DAG
dag = DAG(dag_id='test_python_operator_template_dag',
default_args={"owner": "lamblin",
"start_date": datetime.now()},
template_searchpath=['/Users/daniellamblin/airflow/dags'],
schedule_interval='@once')
# Define the single task in this controller example DAG
op = PythonOperator(task_id='test_python_operator_template',
provide_context=True,
python_callable=templated_function,
templates_dict={
'pyoptemplate': '',
'pyoptemplate.sql': '',
'sql': 'pyoptemplate',
'file1':'pyoptemplate.sql',
'file2':'pyoptemplate.t',
'table': '{{params.table}}'},
templates_exts=['.sql','.t'],
params={'condition_param': True,
'message': 'Hello World',
'table': 'TEMP_TABLE'},
dag=dag)
Le résultat d'une course montre que table
a été modélisé correctement comme une chaîne de caractères, mais les autres ne tirez pas dans les fichiers pour la création de modèles.
dlamblin$ airflow test test_python_operator_template_dag test_python_operator_template 2017-01-18
[2017-01-18 23:58:06,698] {__init__.py:36} INFO - Using executor SequentialExecutor
[2017-01-18 23:58:07,342] {models.py:154} INFO - Filling up the DagBag from /Users/daniellamblin/airflow/dags
[2017-01-18 23:58:07,620] {models.py:1196} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------
[2017-01-18 23:58:07,620] {models.py:1219} INFO - Executing <Task(PythonOperator): test_python_operator_template> on 2017-01-18 00:00:00
'2017-01-18'
{ u'END_DATE': '2017-01-18',
u'conf': <module 'airflow.configuration' from '/Library/Python/2.7/site-packages/airflow/configuration.pyc'>,
u'dag': <DAG: test_python_operator_template_dag>,
u'dag_run': None,
u'ds_nodash': u'20170118',
u'end_date': '2017-01-18',
u'execution_date': datetime.datetime(2017, 1, 18, 0, 0),
u'latest_date': '2017-01-18',
u'macros': <module 'airflow.macros' from '/Library/Python/2.7/site-packages/airflow/macros/__init__.pyc'>,
u'params': { 'condition_param': True,
'message': 'Hello World',
'table': 'TEMP_TABLE'},
u'run_id': None,
u'tables': None,
u'task': <Task(PythonOperator): test_python_operator_template>,
u'task_instance': <TaskInstance: test_python_operator_template_dag.test_python_operator_template 2017-01-18 00:00:00 [running]>,
u'task_instance_key_str': u'test_python_operator_template_dag__test_python_operator_template__20170118',
'templates_dict': { 'file1': u'pyoptemplate.sql',
'file2': u'pyoptemplate.t',
'pyoptemplate': u'',
'pyoptemplate.sql': u'',
'sql': u'pyoptemplate',
'table': u'TEMP_TABLE'},
u'test_mode': True,
u'ti': <TaskInstance: test_python_operator_template_dag.test_python_operator_template 2017-01-18 00:00:00 [running]>,
u'tomorrow_ds': '2017-01-19',
u'tomorrow_ds_nodash': u'20170119',
u'ts': '2017-01-18T00:00:00',
u'ts_nodash': u'20170118T000000',
u'yesterday_ds': '2017-01-17',
u'yesterday_ds_nodash': u'20170117'}
[2017-01-18 23:58:07,634] {python_operator.py:67} INFO - Done. Returned value was: None
- En fait, si quelqu'un peut lien à un cas de fichier de template avec un autre opérateur, peut-être que ce serait aussi m'aider.
- Honnêtement, je pense que ce n'est actuellement pas implémenté pour
PythonOperator
. - ont ajouté un exemple de travail pour bash_operator, peut-être que ce serait vous aider à bidouiller le python opérateur de faire votre appel d'offres
Vous devez vous connecter pour publier un commentaire.
Que de la circulation de l'Air 1.8, la façon dont le PythonOperator remplace son
template_ext
champ dans__init__
ne fonctionne pas. La tâche de vérifiertemplate_ext
sur le__class__
. Pour créer un PythonOperator qui ramasse SQL fichiers de modèle vous avez seulement besoin de faire ce qui suit:Et puis pour accéder à la SQL de votre tâche lorsqu'il s'exécute:
Récemment je suis tombé sur le même sujet, et enfin résolu. @Ardan 's la solution est correcte mais juste envie de répéter avec une réponse plus complète, avec quelques détails sur la façon dont le flux d'Air fonctionne pour les nouveaux arrivants.
Bien sûr, vous premier besoin d'un de ces:
En supposant que vous avez un fichier de modèle sql comme ci-dessous:
D'abord assurez-vous d'ajouter votre dossier dans le chemin de recherche dans votre dag params.
Ne passe pas template_searchpath à args et puis passer des arguments à DAG!!!! Il ne fonctionne pas.
Alors votre opérateur téléphonique sera
Votre fonction sera:
Quelques explications:
La circulation de l'air utilise les valeurs de contexte afin de rendre votre modèle. Pour ajouter manuellement le contexte, vous pouvez utiliser les paramètres de champ, comme ci-dessus.
PythonOperator ne pas prendre le modèle de l'extension de fichier à partir de la template_ext champ plus comme @Ardan mentionné. Le code source est ici.
Il ne prend de l'extension de auto.__la classe__.template_ext.
Boucles de circulation de l'air à travers le template_dict terrain, et si la valeur.endswith(file_extension) == Vrai, alors il rend le modèle.
Je ne pense pas que ce soit vraiment possible. Mais la solution de contournement suivante pourrait être utile:
Aimerais une meilleure solution, si!
templates_dict
est rendu et il y a untemplates_exts
mais le dict ne pas utiliser les extraits d'extraire automatiquement les fichiers que l'on peut attendre frorm d'autres opérateurs.Impossible d'obtenir un fichier de script basé sur un modèle en python de travail (python). Mais, par exemple, avec bash opérateur est la suivante, peut-être que peut vous donner quelques indications
l'01.sh script ressemble suit
Cela donnera une sortie comme suit sur l'exécution du test de