从数据存储中查询大量ndb实体的最佳实践 [英] Best practice to query large number of ndb entities from datastore

查看:155
本文介绍了从数据存储中查询大量ndb实体的最佳实践的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经遇到了App Engine数据存储的一个有趣的限制。我正在创建一个处理程序来帮助我们分析我们的生产服务器上的某些使用情况数据。为了执行分析,我需要查询和汇总从数据存储中提取的10,000多个实体。计算并不难,它只是通过使用样本的特定过滤器的项目的直方图。我碰到的问题是我无法从数据存储中快速获取数据,以便在查询截止日期之前完成任何处理。



我已经尽力了考虑将查询分块为并行RPC调用以提高性能,但根据appstats,我似乎无法使查询实际并行执行。无论我尝试什么方法(请参阅下文),似乎总是会回到序列化后续查询的瀑布。



注意:查询和分析代码可以工作,它只是运行缓慢,因为我无法从数据存储快速获取数据。



背景



我没有可以分享的现场版本,但这里是我所谈论的系统部分的基本模型:

  class Session(ndb.Model):
一个跟踪的用户会话(客户账户(公司),版本,操作系统等)
data = ndb.JsonProperty = ndb.StringProperty(required = True,indexed = True)
session = ndb.KeyProperty (required = True,kind = Session)
timestamp = ndb.DateTimeProperty(required = True,indexed = True)
tags = ndb.StringProperty(重复= True,索引= True)

您可以将样本视为用户使用某个给定名称的功能时的时间。 (例如:'systemA.feature_x')。标签基于客户详细信息,系统信息和功能。例如:['winxp','2.5.1','systemA','feature_x','premium_account'])。因此,这些标签形成了一个非正规化的令牌集,可用于查找感兴趣的样本。



我试图做的分析包括采取日期范围并询问如何很多时候,每个客户帐户(公司,而不是每个用户)每天(或每小时)使用一组功能(也许是所有功能)的功能。



输入到处理程序应该是这样的:


  • 开始日期

  • 结束日期



  • $ b

    输出结果为:

    $ p $ [{
    'company_account':< string> ;,
    'counts':[
    {'timeperiod':< iso8601 date> ;, 'count':< int>},...
    ]
    },...
    ]



    查询的通用代码



    以下是所有查询的一些共同代码。处理程序的一般结构是一个简单的使用webapp2的获取处理程序,它设置查询参数,运行查询,处理结果,创建要返回的数据。

     # - 构建查询对象---#
    query_opts = {}
    query_opts ['batch_size'] = 500#引入大量实体

    q = Sample.query()
    q = q.order(Sample.timestamp)

    #标签
    tag_args = [(Sample.tags == t)for t in tags ]
    q = q.filter(ndb.query.AND(* tag_args))

    def handle_sample(sample):
    session_obj = sample.session.get()#通常找到在本地或memcache中感谢ndb
    count_key = session_obj.data ['customer']
    addCountForPeriod(count_key,sample.timestamp)



    方法尝试



    我尝试了各种方法尝试尽快从数据存储区中提取数据,并且平行。我迄今试过的方法包括:

    A。单次迭代



    这是一个更简单的基本案例,可以与其他方法进行比较。我只是建立查询并遍历所有项目,让ndb做它将它们一个接一个地拉出来。

      q = q.filter(Sample.timestamp> = start_time)
    q = q.filter(Sample.timestamp< = end_time)
    q_iter = q.iter(** query_opts)

    为q_iter中的样本:
    handle_sample(样本)



    B。 Large Fetch



    这里的想法是看我是否可以做一个非常大的获取。

      q = q.filter(Sample.timestamp> = start_time)
    q = q.filter(Sample.timestamp< = end_time)
    samples = q.fetch(20000, ** query_opts)

    样品中的样品:
    handle_sample(样品)



    2 C。异步提取跨越时间范围

    这里的想法是要认识到样本间的时间间隔相当好,所以我可以创建一组独立的查询,将整个时间区域分成几块,并尝试使用异步方式并行运行其中的每一个:

     #将timestamp空间拆分为20个相等部分和异步查询它们中的每一个都是
    ts_delta =(end_time - start_time)/ 20
    cur_start_time = start_time
    q_futures = []

    在范围内(ts_intervals):
    cur_end_time =(cur_start_time + ts_delta)
    if x ==(ts_intervals-1):#最后一个必须覆盖全部范围
    cur_end_time = end_time

    f = q.filter( Sample.timestamp> = cur_start_time,
    Sample.timestamp< cur_end_time).fetch_async(limit = None,** query_opts)
    q_futures.append(f)
    cur_start_time = cur_end_time

    #现在在q_futures中循环并收集结果
    给f:
    sampl es = f.get_result()
    示例中的示例:
    handle_sample(sample)



    d。异步映射

    我试过这个方法,因为当使用Query.map_async方法时,文档听起来像ndb可能会自动利用一些并行。

      q = q.filter(Sample.timestamp> = start_time)
    q = q.filter(Sample.timestamp< = end_time)

    @ ndb.tasklet
    def process_sample(sample):
    period_ts = getPeriodTimestamp(sample.timestamp)
    session_obj = yield sample.session.get_async()#查找会话对象from cache
    count_key = session_obj.data ['customer']
    addCountForPeriod(count_key,sample.timestamp)
    raise ndb.Return(None)

    q_future = q .map_async(process_sample,** query_opts)
    res = q_future.get_result()



    结果



    我测试了一个示例查询来收集总体响应时间和appstats跟踪。结果是:

    A。单次迭代



    real:15.645s



    这一个按顺序依次获取批次,然后检索每一个来自memcache的会话。





    B。 Large Fetch



    real:12.12s



    与选项A有效相同,但由于某种原因速度稍快。





    C。异步提取跨越时间范围

    real:15.251s

    似乎在开始时提供了更多的并行性,但似乎在结果的迭代过程中,通过一系列调用来减缓。也似乎无法将会话memcache查找与待处理查询重叠。





    D。异步映射

    real:13.752s

    这个对我来说是最难理解的。它看起来好像有很多重叠,但是一切似乎都是以瀑布而不是平行的方式展开的。

    .stack.imgur.com / JoNlN.pngalt =方法D appstats>



    建议



    基于这一切,我错过了什么?我只是在App Engine上达到一个限制,还是有更好的方法来并行拉下大量实体?



    我对下一步尝试的方式感到不知所措。我想过重写客户端并行地向应用引擎发出多个请求,但这看起来很蛮横。我真的希望应用引擎应该能够处理这个用例,所以我猜测我缺少一些东西。



    更新



    最后我发现选项C对我来说是最好的。我能够优化它在6.1秒内完成。仍然不完美,但更好。



    在得到几个人的建议后,我发现以下几项是理解和记住的关键:




    • 多个查询可以并行运行
    • 只能有10个RPC同时运行

    • 尝试进行非规范化处理,直到没有辅助查询

    • 这种类型的任务最好还是用于映射reduce和任务队列,而不是实时查询



    • 因此,我做了什么来加快速度:


      • 我根据时间从一开始就划分了查询空间。 (注意:分区对于返回的实体而言越平等,越好)
      • 我进一步对数据进行非规范化以消除对辅助会话查询的需求。
      • 我使用ndb异步操作和wait_any()将查询与处理重叠在一起



      我还没有得到我期望或喜欢的表现,但现在可行。我希望他们是一个更好的方式,可以在处理程序中快速地将大量连续实体拉入内存。

      解决方案

不应该在60秒的时间限制的用户请求中完成。相反,它应该在支持长时间运行的请求的上下文中完成。 任务队列支持长达10分钟的请求,并且(我相信)正常的内存限制(默认情况下,F1实例的 128MB内存)。对于更高的限制(无请求超时,1GB内存),请使用后端



以下是一些可以尝试的方法:设置一个URL,该URL在被访问时触发任务队列任务。如果任务队列任务已完成,它将返回一个网页,轮询每个〜5s到另一个URL,该网址以true / false作为响应。任务队列处理数据(可能需要几十秒钟),并将结果作为计算数据或渲染的网页保存到数据存储中。一旦初始页面检测到它已完成,用户将被重定向到页面,该页面将从数据存储中获取现在计算的结果。


I have run into an interesting limit with the App Engine datastore. I am creating a handler to help us analyze some usage data on one of our production servers. To perform the analysis I need to query and summarize 10,000+ entities pulled from the datastore. The calculation isn't hard, it is just a histogram of items that pass a specific filter of the usage samples. The problem I hit is that I can't get the data back from the datastore fast enough to do any processing before hitting the query deadline.

I have tried everything I can think of to chunk the query into parallel RPC calls to improve performance, but according to appstats I can't seem to get the queries to actually execute in parallel. No matter what method I try (see below) it always seems that the RPC's fall back to a waterfall of sequential next queries.

Note: the query and analysis code does work, it just runs to slowly because I can't get data quickly enough from the datastore.

Background

I don't have a live version I can share, but here is the basic model for the part of the system I am talking about:

class Session(ndb.Model):
   """ A tracked user session. (customer account (company), version, OS, etc) """
   data = ndb.JsonProperty(required = False, indexed = False)

class Sample(ndb.Model):
   name      = ndb.StringProperty  (required = True,  indexed = True)
   session   = ndb.KeyProperty     (required = True,  kind = Session)
   timestamp = ndb.DateTimeProperty(required = True,  indexed = True)
   tags      = ndb.StringProperty  (repeated = True,  indexed = True)

You can think of the samples as times when a user makes use of a capability of a given name. (ex: 'systemA.feature_x'). The tags are based upon customer details, system information, and the feature. ex: ['winxp', '2.5.1', 'systemA', 'feature_x', 'premium_account']). So the tags form a denormalized set of tokens that could be used to find samples of interest.

The analysis I am trying to do consists of taking a date range and asking how many times was a feature of set of features (perhaps all features) used per day (or per hour) per customer account (company, not per user).

So the input to the handler be something like:

  • Start Date
  • End Date
  • Tag(s)

Output would be:

[{
   'company_account': <string>,
   'counts': [
      {'timeperiod': <iso8601 date>, 'count': <int>}, ...
   ]
 }, ...
]

Common Code for Queries

Here is some code in common for all queries. The general structure of the handler is a simple get handler using webapp2 that sets up the query parameters, runs the query, processes the results, creates data to return.

# -- Build Query Object --- #
query_opts = {}
query_opts['batch_size'] = 500   # Bring in large groups of entities

q = Sample.query()
q = q.order(Sample.timestamp)

# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))

def handle_sample(sample):
   session_obj = sample.session.get()    # Usually found in local or memcache thanks to ndb
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)

Methods Tried

I have tried a variety of methods to try to pull data from the datastore as quickly as possible and in parallel. The methods I have tried so far include:

A. Single Iteration

This is more of a simple base case to compare against the other methods. I just build the query and iterate over all the items letting ndb do what it does to pull them one after the other.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)

for sample in q_iter:
   handle_sample(sample)

B. Large Fetch

The idea here was to see if I could do a single very large fetch.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)

for sample in samples:
   handle_sample(sample)

C. Async fetches across time range

The idea here is to recognize that the samples are fairly well spaced across time so I can create a set of independent queries that split the overall time region into chunks and try to run each of these in parallel using async:

# split up timestamp space into 20 equal parts and async query each of them
ts_delta       = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []

for x in range(ts_intervals):
   cur_end_time = (cur_start_time + ts_delta)
   if x == (ts_intervals-1):    # Last one has to cover full range
      cur_end_time = end_time

   f = q.filter(Sample.timestamp >= cur_start_time,
                Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
   q_futures.append(f)
   cur_start_time = cur_end_time

# Now loop through and collect results
for f in q_futures:
   samples = f.get_result()
   for sample in samples:
      handle_sample(sample)

D. Async mapping

I tried this method because the documentation made it sound like ndb may exploit some parallelism automatically when using the Query.map_async method.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)

@ndb.tasklet
def process_sample(sample):
   period_ts   = getPeriodTimestamp(sample.timestamp)
   session_obj = yield sample.session.get_async()    # Lookup the session object from cache
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)
   raise ndb.Return(None)

q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()

Outcome

I tested out one example query to collect overall response time and appstats traces. The results are:

A. Single Iteration

real: 15.645s

This one goes sequentially through fetching batches one after the other and then retrieves every session from memcache.

B. Large Fetch

real: 12.12s

Effectively the same as option A but a bit faster for some reason.

C. Async fetches across time range

real: 15.251s

Appears to provide more parallelism at the start but seems to get slowed down by a sequence of calls to next during iteration of the results. Also doesn't seem to be able to overlap the session memcache lookups with the pending queries.

D. Async mapping

real: 13.752s

This one is the hardest for me to understand. It looks like it has q good deal of overlapping, but everything seems to stretch out in a waterfall instead of in parallel.

Recommendations

Based upon all this, what am I missing? Am I just hitting a limit on App Engine or is there a better way to pull down large number of entities in parallel?

I am at a loss as to what to try next. I thought about rewriting the client to make multiple requests to app engine in parallel but this seems pretty brute force. I would really expect that app engine should be able to handle this use case so I am guessing there is something I am missing.

Update

In the end I found that option C was the best for my case. I was able to optimize it to complete in 6.1 seconds. Still not perfect, but much better.

After getting advice from several people, I found that the following items were key to understand and keep in mind:

  • Multiple queries can run in parallel
  • Only 10 RPC's can be in flight at once
  • Try to denormalize to the point that there are no secondary queries
  • This type of task is better left to map reduce and task queues, not real-time queries

So what I did to make it faster:

  • I partitioned the query space from the beginning based upon time. (note: the more equal the partitions are in terms of entities returned, the better)
  • I denormalized the data further to remove the need for the secondary session query
  • I made use of ndb async operations and wait_any() to overlap the queries with the processing

I am still not getting the performance I would expect or like, but it is workable for now. I just wish their was a better way to pull large numbers of sequential entities into memory quickly in handlers.

解决方案

Large processing like this should not be done in a user request, which has a 60s time limit. Instead, it should be done in a context that supports long-running requests. The task queue supports requests up to 10 minutes, and (I believe) normal memory restraints (F1 instances, the default, have 128MB of memory). For even higher limits (no request timeout, 1GB+ of memory), use backends.

Here's something to try: set up a URL that, when accessed, fires off a task queue task. It returns a web page that polls every ~5s to another URL that responds with true/false if the task queue task has been completed yet. The task queue processes the data, which can take some 10s of seconds, and saves the result to the datastore either as the computed data or a rendered web page. Once the initial page detects that it has completed, the user is redirected to the page, which fetches the now computed results from the datastore.

这篇关于从数据存储中查询大量ndb实体的最佳实践的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆