气流:如何从PostgreOperator推动xcom的价值? [英] Airflow: How to push xcom value from PostgreOperator?

查看:90
本文介绍了气流:如何从PostgreOperator推动xcom的价值?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是Airflow 1.8.1,我想推送来自PostgreOperator的sql请求的结果。

I'm using Airflow 1.8.1 and I want to push the result of a sql request from PostgreOperator.

这是我的任务:

check_task = PostgresOperator(
    task_id='check_task',
    postgres_conn_id='conx',
    sql="check_task.sql",
    xcom_push=True,
    dag=dag)

def py_is_first_execution(**kwargs):
    value = kwargs['ti'].xcom_pull(task_ids='check_task')
    print 'count ----> ', value
    if value == 0:
       return 'next_task'
    else:
       return 'end-flow'

check_branch = BranchPythonOperator(
    task_id='is-first-execution',
    python_callable=py_is_first_execution,
    provide_context=True,
    dag=dag)

,这是我的SQL脚本:

and here is my sql script:

select count(1) from table

c $ c>它检索 none 值。

when i check the xcom value from check_task it retrieves none value.

推荐答案

最后,我在插件管理器中的 $ AIRFLOW_HOME / plugins 下创建了一个新的Sensor ExecuteSqlOperator

Finally, I created a new Sensor ExecuteSqlOperator in the plugin manager under $AIRFLOW_HOME/plugins.

我以 CheckOperator 为例,并修改了返回值:此运算符的基本运行与我所需要的完全相反。

I used CheckOperator as an example and I modified the returned value: the basic running of this operator was exactly the reverse of what I needed.

这是默认 ExecuteSqlOperator 的值:
CheckOperator

这是我自定义的 SqlSensor ReverseSqlSensor

class SqlExecuteOperator(BaseOperator):
    """
    Performs checks against a db. The ``CheckOperator`` expects
    a sql query that will return a single row.

    Note that this is an abstract class and get_db_hook
    needs to be defined. Whereas a get_db_hook is hook that gets a
    single record from an external source.
    :param sql: the sql to be executed
    :type sql: string
    """

    template_fields = ('sql',)
    template_ext = ('.hql', '.sql',)
    ui_color = '#fff7e6'

    @apply_defaults
    def __init__(
            self, sql,
            conn_id=None,
            *args, **kwargs):
        super(SqlExecuteOperator, self).__init__(*args, **kwargs)
        self.conn_id = conn_id
        self.sql = sql

    def execute(self, context=None):
        logging.info('Executing SQL statement: ' + self.sql)
        records = self.get_db_hook().get_first(self.sql)
        logging.info("Record: " + str(records))
        records_int = int(records[0])
        print (records_int)
        return records_int

    def get_db_hook(self):
        return BaseHook.get_hook(conn_id=self.conn_id)

这篇关于气流:如何从PostgreOperator推动xcom的价值?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆