La circulation de l'air à l'aide de fichiers de modèle pour PythonOperator

La méthode de l'obtention d'un BashOperator ou SqlOperator pour ramasser un fichier externe pour son modèle est un peu clair et documenté, mais en regardant les PythonOperator mon test de ce que je comprends de la documentation n'est pas de travail. Je ne suis pas sûr de savoir comment les templates_exts et templates_dict paramètres correctement interagissent pour ramasser un fichier.

Dans mon dags dossier que j'ai créé: pyoptemplate.sql et pyoptemplate.t ainsi que test_python_operator_template.py:

pyoptemplate.sql:

SELECT * FROM {{params.table}};

pyoptemplate.t:

SELECT * FROM {{params.table}};

test_python_operator_template.py:

# coding: utf-8
# vim:ai:si:et:sw=4 ts=4 tw=80
"""
# A Test of Templates in PythonOperator
"""

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

import pprint

pp = pprint.PrettyPrinter(indent=4)


def templated_function(ds, **kwargs):
    """This function will try to use templates loaded from external files"""
    pp.pprint(ds)
    pp.pprint(kwargs)


# Define the DAG
dag = DAG(dag_id='test_python_operator_template_dag',
          default_args={"owner": "lamblin",
                        "start_date": datetime.now()},
          template_searchpath=['/Users/daniellamblin/airflow/dags'],
          schedule_interval='@once')


# Define the single task in this controller example DAG
op = PythonOperator(task_id='test_python_operator_template',
                    provide_context=True,
                    python_callable=templated_function,
                    templates_dict={
                        'pyoptemplate': '',
                        'pyoptemplate.sql': '',
                        'sql': 'pyoptemplate',
                        'file1':'pyoptemplate.sql',
                        'file2':'pyoptemplate.t',
                        'table': '{{params.table}}'},
                    templates_exts=['.sql','.t'],
                    params={'condition_param': True,
                            'message': 'Hello World',
                            'table': 'TEMP_TABLE'},
                    dag=dag)

Le résultat d'une course montre que table a été modélisé correctement comme une chaîne de caractères, mais les autres ne tirez pas dans les fichiers pour la création de modèles.

dlamblin$ airflow test test_python_operator_template_dag test_python_operator_template 2017-01-18
[2017-01-18 23:58:06,698] {__init__.py:36} INFO - Using executor SequentialExecutor
[2017-01-18 23:58:07,342] {models.py:154} INFO - Filling up the DagBag from /Users/daniellamblin/airflow/dags
[2017-01-18 23:58:07,620] {models.py:1196} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------
[2017-01-18 23:58:07,620] {models.py:1219} INFO - Executing <Task(PythonOperator): test_python_operator_template> on 2017-01-18 00:00:00
'2017-01-18'
{   u'END_DATE': '2017-01-18',
u'conf': <module 'airflow.configuration' from '/Library/Python/2.7/site-packages/airflow/configuration.pyc'>,
u'dag': <DAG: test_python_operator_template_dag>,
u'dag_run': None,
u'ds_nodash': u'20170118',
u'end_date': '2017-01-18',
u'execution_date': datetime.datetime(2017, 1, 18, 0, 0),
u'latest_date': '2017-01-18',
u'macros': <module 'airflow.macros' from '/Library/Python/2.7/site-packages/airflow/macros/__init__.pyc'>,
u'params': {   'condition_param': True,
'message': 'Hello World',
'table': 'TEMP_TABLE'},
u'run_id': None,
u'tables': None,
u'task': <Task(PythonOperator): test_python_operator_template>,
u'task_instance': <TaskInstance: test_python_operator_template_dag.test_python_operator_template 2017-01-18 00:00:00 [running]>,
u'task_instance_key_str': u'test_python_operator_template_dag__test_python_operator_template__20170118',
'templates_dict': {   'file1': u'pyoptemplate.sql',
'file2': u'pyoptemplate.t',
'pyoptemplate': u'',
'pyoptemplate.sql': u'',
'sql': u'pyoptemplate',
'table': u'TEMP_TABLE'},
u'test_mode': True,
u'ti': <TaskInstance: test_python_operator_template_dag.test_python_operator_template 2017-01-18 00:00:00 [running]>,
u'tomorrow_ds': '2017-01-19',
u'tomorrow_ds_nodash': u'20170119',
u'ts': '2017-01-18T00:00:00',
u'ts_nodash': u'20170118T000000',
u'yesterday_ds': '2017-01-17',
u'yesterday_ds_nodash': u'20170117'}
[2017-01-18 23:58:07,634] {python_operator.py:67} INFO - Done. Returned value was: None
  • En fait, si quelqu'un peut lien à un cas de fichier de template avec un autre opérateur, peut-être que ce serait aussi m'aider.
  • Honnêtement, je pense que ce n'est actuellement pas implémenté pour PythonOperator.
  • ont ajouté un exemple de travail pour bash_operator, peut-être que ce serait vous aider à bidouiller le python opérateur de faire votre appel d'offres
InformationsquelleAutor dlamblin | 2017-01-19