从数据存储中查询大量 ndb 实体的最佳实践 [英] Best practice to query large number of ndb entities from datastore

查看:30
本文介绍了从数据存储中查询大量 ndb 实体的最佳实践的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 App Engine 数据存储区遇到了一个有趣的限制.我正在创建一个处理程序来帮助我们分析其中一台生产服务器上的一些使用数据.为了执行分析,我需要查询和汇总从数据存储中提取的 10,000 多个实体.计算并不难,它只是通过使用样本的特定过滤器的项目的直方图.我遇到的问题是,我无法以足够快的速度从数据存储中取回数据,无法在查询截止日期前进行任何处理.

I have run into an interesting limit with the App Engine datastore. I am creating a handler to help us analyze some usage data on one of our production servers. To perform the analysis I need to query and summarize 10,000+ entities pulled from the datastore. The calculation isn't hard, it is just a histogram of items that pass a specific filter of the usage samples. The problem I hit is that I can't get the data back from the datastore fast enough to do any processing before hitting the query deadline.

我已经尝试了所有我能想到的方法,将查询分块到并行 RPC 调用中以提高性能,但根据 appstats,我似乎无法让查询实际并行执行.无论我尝试什么方法(见下文),RPC 似乎总是退回到一个连续的下一个查询的瀑布.

I have tried everything I can think of to chunk the query into parallel RPC calls to improve performance, but according to appstats I can't seem to get the queries to actually execute in parallel. No matter what method I try (see below) it always seems that the RPC's fall back to a waterfall of sequential next queries.

注意:查询和分析代码确实有效,它只是运行缓慢,因为我无法从数据存储中足够快地获取数据.

Note: the query and analysis code does work, it just runs to slowly because I can't get data quickly enough from the datastore.

我没有可以分享的实时版本,但这是我正在谈论的系统部分的基本模型:

I don't have a live version I can share, but here is the basic model for the part of the system I am talking about:

class Session(ndb.Model):
   """ A tracked user session. (customer account (company), version, OS, etc) """
   data = ndb.JsonProperty(required = False, indexed = False)

class Sample(ndb.Model):
   name      = ndb.StringProperty  (required = True,  indexed = True)
   session   = ndb.KeyProperty     (required = True,  kind = Session)
   timestamp = ndb.DateTimeProperty(required = True,  indexed = True)
   tags      = ndb.StringProperty  (repeated = True,  indexed = True)

您可以将示例视为用户使用给定名称的功能的时间.(例如:'systemA.feature_x').标签基于客户详细信息、系统信息和功能.例如:['winxp'、'2.5.1'、'systemA'、'feature_x'、'premium_account']).因此,标签形成了一组非规范化的标记,可用于查找感兴趣的样本.

You can think of the samples as times when a user makes use of a capability of a given name. (ex: 'systemA.feature_x'). The tags are based upon customer details, system information, and the feature. ex: ['winxp', '2.5.1', 'systemA', 'feature_x', 'premium_account']). So the tags form a denormalized set of tokens that could be used to find samples of interest.

我尝试进行的分析包括获取日期范围并询问每个客户帐户(公司,而不是每个用户)每天(或每小时)使用一组功能(可能是所有功能)的次数).

The analysis I am trying to do consists of taking a date range and asking how many times was a feature of set of features (perhaps all features) used per day (or per hour) per customer account (company, not per user).

因此处理程序的输入类似于:

So the input to the handler be something like:

  • 开始日期
  • 结束日期
  • 标签

输出将是:

[{
   'company_account': <string>,
   'counts': [
      {'timeperiod': <iso8601 date>, 'count': <int>}, ...
   ]
 }, ...
]

查询常用代码

以下是所有查询的一些通用代码.处理程序的一般结构是使用 webapp2 的简单获取处理程序,它设置查询参数、运行查询、处理结果、创建要返回的数据.

Common Code for Queries

Here is some code in common for all queries. The general structure of the handler is a simple get handler using webapp2 that sets up the query parameters, runs the query, processes the results, creates data to return.

# -- Build Query Object --- #
query_opts = {}
query_opts['batch_size'] = 500   # Bring in large groups of entities

q = Sample.query()
q = q.order(Sample.timestamp)

# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))

def handle_sample(sample):
   session_obj = sample.session.get()    # Usually found in local or memcache thanks to ndb
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)

尝试过的方法

我尝试了多种方法来尝试尽快并行地从数据存储中提取数据.目前我尝试过的方法包括:

Methods Tried

I have tried a variety of methods to try to pull data from the datastore as quickly as possible and in parallel. The methods I have tried so far include:

这更像是一个简单的基本案例,可以与其他方法进行比较.我只是构建查询并迭代所有项目,让 ndb 做它所做的事情,将它们一个接一个地拉出来.

This is more of a simple base case to compare against the other methods. I just build the query and iterate over all the items letting ndb do what it does to pull them one after the other.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)

for sample in q_iter:
   handle_sample(sample)

B.大提取

这里的想法是看看我是否可以进行一次非常大的提取.

B. Large Fetch

The idea here was to see if I could do a single very large fetch.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)

for sample in samples:
   handle_sample(sample)

C.跨时间范围异步获取

这里的想法是认识到样本在时间上的间隔相当好,因此我可以创建一组独立的查询,将整个时间区域拆分为多个块,并尝试使用异步并行运行每个查询:

C. Async fetches across time range

The idea here is to recognize that the samples are fairly well spaced across time so I can create a set of independent queries that split the overall time region into chunks and try to run each of these in parallel using async:

# split up timestamp space into 20 equal parts and async query each of them
ts_delta       = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []

for x in range(ts_intervals):
   cur_end_time = (cur_start_time + ts_delta)
   if x == (ts_intervals-1):    # Last one has to cover full range
      cur_end_time = end_time

   f = q.filter(Sample.timestamp >= cur_start_time,
                Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
   q_futures.append(f)
   cur_start_time = cur_end_time

# Now loop through and collect results
for f in q_futures:
   samples = f.get_result()
   for sample in samples:
      handle_sample(sample)

D.异步映射

我尝试了这种方法,因为文档听起来像是 ndb 在使用 Query.map_async 方法时可能会自动利用一些并行性.

D. Async mapping

I tried this method because the documentation made it sound like ndb may exploit some parallelism automatically when using the Query.map_async method.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)

@ndb.tasklet
def process_sample(sample):
   period_ts   = getPeriodTimestamp(sample.timestamp)
   session_obj = yield sample.session.get_async()    # Lookup the session object from cache
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)
   raise ndb.Return(None)

q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()

结果

我测试了一个示例查询来收集总体响应时间和 appstats 跟踪.结果是:

Outcome

I tested out one example query to collect overall response time and appstats traces. The results are:

真实:15.645s

这个顺序一个接一个地抓取批处理,然后从内存缓存中检索每个会话.

This one goes sequentially through fetching batches one after the other and then retrieves every session from memcache.

真实:12.12s

实际上与选项 A 相同,但由于某种原因要快一些.

Effectively the same as option A but a bit faster for some reason.

真实:15.251s

似乎在开始时提供了更多的并行性,但似乎在结果迭代期间因一系列对 next 的调用而变慢.似乎也无法将会话内存缓存查找与挂起的查询重叠.

Appears to provide more parallelism at the start but seems to get slowed down by a sequence of calls to next during iteration of the results. Also doesn't seem to be able to overlap the session memcache lookups with the pending queries.

真实:13.752s

这个对我来说是最难理解的.看起来它有很多重叠,但一切似乎都像瀑布一样伸展而不是平行.

This one is the hardest for me to understand. It looks like it has q good deal of overlapping, but everything seems to stretch out in a waterfall instead of in parallel.

基于这一切,我错过了什么?我只是在 App Engine 上达到了限制,还是有更好的方法来并行下拉大量实体?

Based upon all this, what am I missing? Am I just hitting a limit on App Engine or is there a better way to pull down large number of entities in parallel?

我不知道接下来要尝试什么.我想过重写客户端以并行向应用程序引擎发出多个请求,但这似乎非常暴力.我真的希望应用引擎应该能够处理这个用例,所以我猜我遗漏了一些东西.

I am at a loss as to what to try next. I thought about rewriting the client to make multiple requests to app engine in parallel but this seems pretty brute force. I would really expect that app engine should be able to handle this use case so I am guessing there is something I am missing.

最后我发现选项 C 最适合我的情况.我能够优化它以在 6.1 秒内完成.仍然不完美,但好多了.

In the end I found that option C was the best for my case. I was able to optimize it to complete in 6.1 seconds. Still not perfect, but much better.

在听取了几个人的建议后,我发现以下几点是理解和牢记的关键:

After getting advice from several people, I found that the following items were key to understand and keep in mind:

  • 多个查询可以并行运行
  • 一次只能运行 10 个 RPC
  • 尝试非规范化到没有辅助查询的程度
  • 这种类型的任务最好留给映射reduce和任务队列,而不是实时查询

所以我做了什么让它更快:

So what I did to make it faster:

  • 我从一开始就根据时间对查询空间进行了分区.(注意:分区在返回的实体方面越平等越好)
  • 我进一步对数据进行了非规范化处理,以消除对辅助会话查询的需要
  • 我利用 ndb 异步操作和 wait_any() 将查询与处理重叠

我仍然没有获得我期望或喜欢的性能,但现在它是可行的.我只是希望它们是一种更好的方法,可以在处理程序中将大量顺序实体快速拉入内存.

I am still not getting the performance I would expect or like, but it is workable for now. I just wish their was a better way to pull large numbers of sequential entities into memory quickly in handlers.

推荐答案

像这样的大型处理不应该在有 60 秒时间限制的用户请求中完成.相反,它应该在支持长时间运行的请求的上下文中完成.任务队列支持长达 10 分钟的请求,以及(我相信)正常的内存限制(默认 F1 实例具有 128MB 内存).对于更高的限制(无请求超时,1GB 以上的内存),请使用后端.

Large processing like this should not be done in a user request, which has a 60s time limit. Instead, it should be done in a context that supports long-running requests. The task queue supports requests up to 10 minutes, and (I believe) normal memory restraints (F1 instances, the default, have 128MB of memory). For even higher limits (no request timeout, 1GB+ of memory), use backends.

这里有一些尝试:设置一个 URL,在访问时触发任务队列任务.它返回一个网页,每约 5 秒轮询一次到另一个 URL,如果任务队列任务尚未完成,则该 URL 以 true/false 响应.任务队列处理数据,这可能需要 10 秒的时间,并将结果作为计算数据或呈现的网页保存到数据存储中.一旦初始页面检测到它已完成,用户将被重定向到该页面,该页面从数据存储中获取现在计算的结果.

Here's something to try: set up a URL that, when accessed, fires off a task queue task. It returns a web page that polls every ~5s to another URL that responds with true/false if the task queue task has been completed yet. The task queue processes the data, which can take some 10s of seconds, and saves the result to the datastore either as the computed data or a rendered web page. Once the initial page detects that it has completed, the user is redirected to the page, which fetches the now computed results from the datastore.

这篇关于从数据存储中查询大量 ndb 实体的最佳实践的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆