如何使用Airflow获取和处理mysql记录? [英] How to obtain and process mysql records using Airflow?

查看:836
本文介绍了如何使用Airflow获取和处理mysql记录?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要

1. run a select query on MYSQL DB and fetch the records.              
2. Records are processed by python script.

我不确定应该如何进行. xcom是前往这里的方式吗?另外,MYSQLOperator仅执行查询,不获取记录.我可以使用任何内置的转移运算符吗?如何在这里使用MYSQL钩子?

I am unsure about the way I should proceed. Is xcom the way to go here? Also, MYSQLOperator only executes the query, doesn't fetch the records. Is there any inbuilt transfer operator I can use? How can I use a MYSQL hook here?

您可能想要使用使用钩子获取数据的PythonOperator, 应用转换并将(已计分的)行运回其他位置.

you may want to use a PythonOperator that uses the hook to get the data, apply transformation and ship the (now scored) rows back some other place.

有人可以解释如何进行相同处理.

Can someone explain how to proceed regarding the same.

引荐- http://markmail.org/message/x6nfeo6zhjfeakfe

Refer - http://markmail.org/message/x6nfeo6zhjfeakfe

def do_work():
    mysqlserver = MySqlHook(connection_id)
    sql = "SELECT * from table where col > 100 "
    row_count = mysqlserver.get_records(sql, schema='testdb')
    print row_count[0][0]

callMYSQLHook = PythonOperator(
    task_id='fetch_from_testdb',
    python_callable=mysqlHook,
    dag=dag
)

这是正确的进行方式吗? 另外,我们如何使用xcoms存储以下MySqlOperator的记录?'

Is this the correct way to proceed? Also how do we use xcoms to store the records for the following MySqlOperator?'

t = MySqlOperator(
conn_id='mysql_default',
task_id='basic_mysql',
sql="SELECT count(*) from table1 where id > 10",
dag=dag)

推荐答案

当然,只需创建一个钩子或运算符并调用get_records()方法即可:

Sure, just create a hook or operator and call the get_records() method: https://airflow.readthedocs.io/en/stable/_modules/airflow/hooks/dbapi_hook.html

这篇关于如何使用Airflow获取和处理mysql记录?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆