优化将数据从 BigQuery 传输到 MongoDB 的气流任务 [英] Optimizing Airflow Task that transfers data from BigQuery into MongoDB

查看:93
本文介绍了优化将数据从 BigQuery 传输到 MongoDB 的气流任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要提高将数据从 BigQuery 传输到 MongoDB 的 Airflow 任务的性能.我的 DAG 中的相关任务使用了一个 PythonOperator,并且只需调用以下 python 函数来传输单个表/集合:

def transfer_full_table(table_name):start_time = time.time()# (1) 连接到 BigQuery + Mongo DBbq = bigquery.Client()集群 = MongoClient(MONGO_URI)db = cluster[dbname"]print(f'(1) 连接到 BQ + Mongo: {round(time.time() - start_time, 5)}')# (2)-(3) 运行 BQ 查询full_query = fselect * from `gcpprojectid.models.{table_name}`";结果 1 = bq.query(full_query)print(f'(2) 查询 BigQuery:{round(time.time() - start_time, 5)}')结果 = results1.to_dataframe()print(f'(3) 转换为 Pandas DF: {round(time.time() - start_time, 5)}')# (4) 处理缺失的日期时间 # 我们可以将其重构为自己的函数吗?datetime_cols = [字典中键的键(results.dtypes) if is_datetime(results[key])]对于 datetime_cols 中的 col:结果[[col]] = 结果[[col]].astype(object).where(results[[col]].notnull(), None)print(f'(4) 已解决的日期时间问题:{round(time.time() - start_time, 5)}')# (5​​) 并正确插入 Mongodb[table_name].drop()db[table_name].insert_many(results.to_dict('records'))print(f'(5) 写给 Mongo: {round(time.time() - start_time, 5)}')

DAG 设置为将许多表从 BigQuery 传输到 MongoDB(每个任务一个传输),而这个特定的 transfer_full_table 函数旨在传输整个单一表,因此它很简单:

  • 查询整个 BQ 表
  • 转换为熊猫,修复类型问题
  • 删除之前的 MongoDB 集合并重新插入

我试图在大小为 60MB 的表上使用此函数,以下是任务各部分的性能:

(1) 连接BQ+Mongo:0.0786(2) 查询到的 BigQuery:0.80595(3) 转换为 Pandas DF:87.2797(4) 已解决的日期时间问题:88.33461(5) 写给 Mongo:213.92398

第 3 步和第 5 步一直在进行.任务非常快速地连接到 BQ 和 Mongo(1),BQ 可以在 60MB(2)的情况下非常快速地查询此表.但是,当我转换为 Pandas 数据帧 (3)(需要 (4) 来处理我遇到的 type 问题)时,这一步需要大约 86.5 秒.解决日期时间问题非常快 (4),但是最后,删除之前的 MongoDB 集合并将新的 Pandas 数据帧重新插入 MongoDB (5) 然后需要 (213.9 - 88.3) = ~125 秒.

我正在寻求有关 Pandas 或 MongoDB 端的任何提示,以了解如何针对这两个瓶颈进行优化.

解决方案

简短的回答是异步操作使您的分析变得混乱.

bq.query 上的文档指出结果 google.cloud.bigquery.job.QueryJob 对象是一个异步 查询作业.这意味着,在提交查询后,python 解释器不会阻塞,直到您尝试通过同步的 QueryJob 方法之一使用查询结果,to_dataframe().您所看到的 87 秒中有很大一部分可能只是用于等待查询返回.

您可以通过迭代调用 QueryJob.done 来等待查询完成,直到它返回 true,然后调用您的第二个分析打印语句.

这不是对您的代码的优化,但希望有助于朝着正确的方向前进.对 Pandas 往返进行一些调整可能会有所帮助,但我认为您的大部分时间很可能都花在等待从您的数据库读取/写入,并且编写更高效或大量较小的查询将成为您的工作减少总时间的唯一选择.

I need to improve the performance of an Airflow task that transfers data from BigQuery to MongoDB. The relevant task in my DAG uses a PythonOperator, and simply calls the following python function to transfer a single table/collection:

def transfer_full_table(table_name):
    start_time = time.time()

    # (1) Connect to BigQuery + Mongo DB
    bq = bigquery.Client()
    cluster = MongoClient(MONGO_URI)
    db = cluster["dbname"]
    print(f'(1) Connected to BQ + Mongo: {round(time.time() - start_time, 5)}')

    # (2)-(3) Run the BQ Queries
    full_query = f"select * from `gcpprojectid.models.{table_name}`"
    results1 = bq.query(full_query)
    print(f'(2) Queried BigQuery: {round(time.time() - start_time, 5)}')
    results = results1.to_dataframe()
    print(f'(3) Converted to Pandas DF: {round(time.time() - start_time, 5)}')

    # (4) Handle Missing DateTimes   # Can we refactor this into its own function?
    datetime_cols = [key for key in dict(results.dtypes) if is_datetime(results[key])]
    for col in datetime_cols:
        results[[col]] = results[[col]].astype(object).where(results[[col]].notnull(), None)
    print(f'(4) Resolved Datetime Issue: {round(time.time() - start_time, 5)}')

    # (5) And Insert Properly Into Mongo
    db[table_name].drop()
    db[table_name].insert_many(results.to_dict('records'))
    print(f'(5) Wrote to Mongo: {round(time.time() - start_time, 5)}')

The DAG is setup to transfer many tables from BigQuery to MongoDB (one transfer for each task), and this particular transfer_full_table function is meant to transfer an entire singular table, so it simply:

  • queries entire BQ table
  • converts to pandas, fixes type issue
  • drop previous MongoDB collection and reinsert

I am attempting to use this function on a table that is 60MB in size, and here is the performance of the various parts of the task:

(1) Connected to BQ + Mongo: 0.0786
(2) Queried BigQuery: 0.80595
(3) Converted to Pandas DF: 87.2797
(4) Resolved Datetime Issue: 88.33461
(5) Wrote to Mongo: 213.92398

Steps 3 and 5 are taking all of the time. The task very quickly connects to BQ and Mongo (1), and BQ can very quickly query this table at 60MB (2). However, when I convert to a pandas dataframe (3) (needed for (4) to handle a type issue I was having), this step takes ~86.5 seconds. Resolving the date-time issue is then very fast (4), however at the end, dropping the previous MongoDB collection and re-inserting the new pandas dataframe into MongoDB (5) then takes (213.9 - 88.3) = ~125 seconds.

I am seeking any tips, either on the Pandas or on the MongoDB end, as to how I can optimize for these two bottlenecks.

解决方案

The short answer is that asynchronous operations are muddying your profiling.

The docs on bq.query state that the resulting google.cloud.bigquery.job.QueryJob object is an asyncronous query job. This means that, after the query is submitted, the python interpreter does not block until you try to use the results of the query with one of the syncronous QueryJob methods, to_dataframe(). A significant share of the 87 seconds you're seeing is likely just spent waiting for the query to return.

You could wait for the query to be complete by calling QueryJob.done iteratively until it returns true, then call your 2nd profiling print statement.

This isn't quite an optimization of your code, but hopefully helps move in the right direction. It's possible some tuning of the pandas roundtrip could help, but I think it's likely that most of your time is being spent waiting for read/write from your databases, and that writing more efficient or a larger number of smaller queries is going to be your only option for cutting down the total time.

这篇关于优化将数据从 BigQuery 传输到 MongoDB 的气流任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆