异步写入 appengine blob 并在所有任务完成时完成它 [英] Writing to an appengine blob asynchronously and finalizing it when all tasks complete

查看:30
本文介绍了异步写入 appengine blob 并在所有任务完成时完成它的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个难题.

我正在遍历一组按日期参数化的 URL 并获取它们.例如,这里是一个例子:

I am iterating through a set of URLs parameterized by date and fetching them. For example, here is an example of one:

somewebservice.com?start=01-01-2012&end=01-10-2012

somewebservice.com?start=01-01-2012&end=01-10-2012

有时,从 URL 返回的内容会被截断(缺少随机结果并附有截断错误"消息),因为我定义了太大的范围,所以我必须将查询拆分为两个 URL

Sometimes, the content returned from the URL gets truncated (missing random results with a 'truncated error' message attached) because I've defined too large a range, so I have to split the query into two URLs

somewebservice.com?start=01-01-2012&end=01-05-2012

somewebservice.com?start=01-01-2012&end=01-05-2012

somewebservice.com?start=01-06-2012&end=01-10-2012

somewebservice.com?start=01-06-2012&end=01-10-2012

我递归执行此操作,直到结果不再被截断,然后我写入允许并发写入的 blob.

I do this recursively until the results aren't truncated anymore, and then I write to a blob, which allows concurrent writes.

这些 URL 提取调用/blob 写入中的每一个都在单独的任务队列任务中处理.

Each of these URL fetch calls/blob writes is handled in a separate task queue task.

问题是,我这辈子都无法设计一个计划来知道所有任务何时完成.我试过使用分片计数器,但递归使它变得困难.有人建议我使用 Pipeline API,所以我看了 3 遍 Slatkin 的演讲.它似乎不适用于递归(但我承认我仍然不完全理解库).

The problem is, I can't for the life of me devise a scheme to know when all the tasks have completed. I've tried using sharded counters, but the recursion makes it difficult. Someone suggested I use the Pipeline API, so I watched the Slatkin talk 3 times. It doesn't appear to work with recursion (but I admit I still don't fully understand the lib).

是否知道一组任务队列任务(以及递归生成的子任务)何时完成,以便我可以完成我的 blob 并对其进行任何处理?

Is there anyway to know when a set of task queue tasks (and children that get spawned recursively) are completed so I can finalize my blob and do whatever with it?

谢谢,约翰

推荐答案

你读过管道入门 文档?管道可以创建其他管道并等待它们,所以做你想做的事情相当简单:

Have you read the Pipelines Getting Started docs? Pipelines can create other pipelines and wait on them, so doing what you want is fairly straightforward:

class RecursivePipeline(pipeline.Pipeline):
  def run(self, param):
    if some_condition: # Too big to process in one
      p1 = yield RecursivePipeline(param1)
      p2 = yield RecursivePipeline(param2)
      yield RecursiveCombiningPipeline(p1, p2)

其中 RecursiveCombiningPipeline 只是充当两个子管道的值的接收器.

Where RecursiveCombiningPipeline simply acts as a receiver for the values of the two sub-pipelines.

这篇关于异步写入 appengine blob 并在所有任务完成时完成它的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆