提高大数据ndb查询的吞吐量 [英] Improve throughput of ndb query over large data

查看:91
本文介绍了提高大数据ndb查询的吞吐量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图在GAE应用程序中执行一些数据处理,以存储在数据存储区中的数据。瓶颈点是查询返回实体的吞吐量,我想知道如何提高查询的性能。



我通常做的是什么:




  • 一切都在任务队列中运行,所以我们有足够的时间(截止时间为10分钟)。

  • ndb实体以便选择哪些实体需要处理。
  • 当查询返回结果时,我将批量实体(例如1000)分组,并将它们发送到另一个任务队列以供进一步处理处理。
  • 存储的数据将会很大(例如500K-1M个实体),并且有可能10分钟截止时间不够。因此,当任务达到任务队列截止日期时,我产生了一项新任务。这意味着我需要一个ndb.Cursor以便从停止的地方继续查询。



问题在于查询返回实体。我已经尝试了几种方法,并观察了以下性能(对于我的应用程序而言速度太慢):

在while循环中使用fetch_page()。



代码很简单

  while has_more and theres_more_time:
实体,cursor,more = query.fetch_page(1000,...)
send_to_process_queue(实体)
has_more = more和cursor

使用这种方法,需要25-30秒来处理10K个实体。粗略地说,这是每分钟20K个实体。我试着改变页面大小或前端实例的类;没有在性能上有任何区别。



分段数据并且同时触发多个fetch_page_async()。



采取此方法从这里(方法C)



整体表现与上文相同。我尝试了不同数量的段(从2到10),以便有2-10个并行的fetch_async()调用。在所有情况下,总体时间保持不变。调用的并行fetch_page_async()越多,每次完成所需的时间就越长。我也尝试了20次并行读取,情况变得更糟。改变页面大小或前端实例类并没有影响。



通过一次fetch()调用获取所有内容。



现在这是最不合适的方法(如果不是不合适的话),因为实例可能会耗尽内存,另外我不会得到游标以防我需要产卵到另一项任务(实际上我甚至没有能力这样做,任务将超过最后期限)。为了看看它的表现如何,我发现了这个好奇心,我观察到了最好的表现! 10K实体花费了8-10秒,这大约是每分钟60K个实体。现在是约。比fetch_page()快3倍。我想知道为什么会发生这种情况。



在一个循环中使用query.iter()。



这与第一种方法相似。这将使用查询迭代器的底层生成器,另外我可以从迭代器中获取游标以防我需要产生新任务,所以它适合我。使用查询迭代器,它在16-18秒内获取了10K个实体,每分钟36-40K个实体。迭代器比fetch_page快30%,但是fetch()要慢得多。

对于上述所有方法,我尝试了F1和F4前端实例,而Datastore没有任何区别性能。我也尝试在查询中更改batch_size参数,仍然没有任何改变。第一个问题是为什么fetch(),fetch_page()和iter()表现如此不同以及如何使fetch_page()或iter()同样好作为fetch()?然后另一个关键问题是这些吞吐量(每分钟20-60K个实体,取决于api调用)是否是我们在GAE中可以做到的最好的。



我知道的MapReduce API,但我认为它不适合我。 AFAIK,MapReduce API不支持查询,我不想扫描所有的数据存储实体(这将是太昂贵和慢 - 查询可能只返回几个结果)。最后但并非最不重要的,我必须坚持GAE。诉诸另一个平台对我来说不是一种选择。所以问题的确是如何优化ndb查询。



有什么建议?

解决方案如果有人感兴趣,我可以通过重新设计组件来显着提高数据处理的吞吐量 - 有人建议我改变数据模型,但这是不可能的。首先,我分割数据,然后在一个单独的任务队列中处理每个数据段。任务不是从一个任务调用多个fetch_page_async(就像我在第一篇文章中所描述的那样) )。最初,这些任务由GAE按顺序使用一个Fx实例进行处理。为了实现任务的并行化,我将组件移至特定的GAE模块,并使用基本缩放比例,即可寻址的Bx实例。当我为每个数据段排入任务时,我明确指示哪个基本实例通过指定'target'选项来处理每个任务。



通过这个设计,我能够在4-5秒内(而不是40'-60'!)使用5个B4实例处理总共20.000个实体。

现在,由于Bx实例,这会产生额外的成本。我们必须微调我们需要的基本实例的类型和数量。

I am trying to perform some data processing in a GAE application over data that is stored in the Datastore. The bottleneck point is the throughput in which the query returns entities and I wonder how to improve the query's performance.

What I do in general:

  • everything works in a task queue, so we have plenty of time (10 minute deadline).
  • I run a query over the ndb entities in order to select which entities need to be processed.
  • as the query returns results, I group entities in batches of, say, 1000 and send them to another task queue for further processing.
  • the stored data is going to be large (say 500K-1M entities) and there is a chance that the 10 minutes deadline is not enough. Therefore, when the task is reaching the taskqueue deadline, I spawn a new task. This means I need an ndb.Cursor in order to continue the query from where it stopped.

The problem is the rate in which the query returns entities. I have tried several approaches and observed the following performance (which is too slow for my app):

Use fetch_page() in a while loop.

The code is straightforward

while has_more and theres_more_time:
 entities, cursor, more = query.fetch_page(1000, ...)
 send_to_process_queue(entities)
 has_more = more and cursor

With this approach, it takes 25-30 seconds to process 10K entities. Roughly speaking, that is 20K entities per minute. I tried changing the page size or the class of the frontend instance; neither made any difference in performance.

Segment the data and fire multiple fetch_page_async() in parallel.

This approach is taken from here (approach C)

The overall performance remains the same as above. I tried with various number of segments (from 2 to 10) in order to have 2-10 parallel fetch_async() calls. In all cases, the overall time remained the same. The more parallel fetch_page_async() are called, the longer it takes for each one to complete. I also tried with 20 parallel fetches and it got worse. Changing the page size or the fronted instance class did not have and impact either.

Fetch everything with a single fetch() call.

Now this is the least suitable approach (if not unsuitable at all) as the instance may run out of memory, plus I don't get a cursor in case I need to spawn to another task (in fact I won't even have the ability to do so, the task will simply exceed the deadline). I tried this out of curiosity in order to see how it performs and I observed the best performance! It took 8-10 seconds for 10K entities, which is roughly be 60K entities per minute. Now that is approx. 3 times faster than fetch_page(). I wonder why this happens.

Use query.iter() in a single loop.

This is match like the first approach. This will make use of the query iterator's underlying generator, plus I can obtain a cursor from the iterator in case I need to spawn a new task, so it suits me. With the query iterator, it fetched 10K entities in 16-18 seconds, which is approx. 36-40K entities per minute. The iterator is 30% faster than fetch_page, but much slower that fetch().

For all the above approaches, I tried F1 and F4 frontend instances without any difference in Datastore performance. I also tried to change the batch_size parameter in the queries, still without any change.

A first question is why do fetch(), fetch_page() and iter() behave so differently and how to make either fetch_page() or iter() do equally well as fetch()? And then another critical question is whether these throughputs (20-60K entities per minute, depending on api call) are the best we can do in GAE.

I 'm aware of the MapReduce API but I think it doesn't suit me. AFAIK, the MapReduce API doesn't support queries and I don't want to scan all the Datastore entities (it's will be too costly and slow - the query may return only a few results). Last, but not least, I have to stick to GAE. Resorting to another platform is not an option for me. So the question really is how to optimize the ndb query.

Any suggestions?

解决方案

In case anyone is interested, I was able to significantly increase the throughput of the data processing by re-designing the component - it was suggested that I change the data models but that was not possible.

First, I segmented the data and then processed each data segment in a separate taskqueue.Task instead of calling multiple fetch_page_async from a single task (as I described in the first post). Initially, these tasks were processed by GAE sequentially utilizing only a single Fx instance. To achieve parallelization of the tasks, I moved the component to a specific GAE module and used basic scaling, i.e. addressable Bx instances. When I enqueue the tasks for each data segment, I explicitly instruct which basic instance will handle each task by specifying the 'target' option.

With this design, I was able to process 20.000 entities in total within 4-5 seconds (instead of 40'-60'!), using 5 B4 instances.

Now, this has additional costs because of the Bx instances. We 'll have to fine tune the type and number of basic instances we need.

这篇关于提高大数据ndb查询的吞吐量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆