在Google应用引擎上使用任务队列时,如何优先处理任务? [英] How can tasks be prioritized when using the task queue on google app engine?
问题描述
我试图解决以下问题:
- 我有一系列我想要执行的任务 / li>
- 我有固定数量的worker来执行这些worker(因为他们使用urlfetch调用外部API,并且对此API的并行调用数量有限)
- 我希望这些任务能够尽快执行(即最小延迟)
- 这些任务是较大任务的一部分,可以根据原始任务的大小(即一个小的原始任务可能会产生1到100个任务,一个中等的100到1000个任务和一个大于1000的任务)
棘手的部分:我想尽可能高效地完成所有这些工作(即最小延迟和尽可能多的并行API调用 - 尽量避免超出限制),但同时要尽量避免从大型原始任务产生的大量任务,以延迟从小原始任务产生的任务。
换句话说:我希望为每个任务分配一个优先级,使其具有更高的优先级,从而防止大任务中的饥饿。
有些四处搜寻似乎并没有显示任何预制的东西可用,所以我想出了以下内容: code> tasks-small 有空闲的插槽并将它排入队列。否则,请检查 另一种选择是使用PULL队列,并根据优先级从队列中取出一个中央协调器并分派它们,但这似乎会增加一点延迟。 然而,这似乎有点骇人听闻,我想知道是否有更好的选择。 编辑:经过一些想法和反馈后,我想通过以下方式使用PULL队列:
tasks-small
, tasks-medium
, tasks - 大
tasks-small
使得 max_concurrent_requests
为30, tasks-medium
60和 tasks-large
100)
T1
这是一个小任务的一部分,请首先检查是否存在
$ b tasks-medium
和 tasks-large
。如果它们都没有空闲插槽,则无论如何都将它排入 tasks-small
,并在处理之前添加任务后处理它(注意:这不是最优的,因为如果slot在其他队列上释放,但它们仍然不会处理来自 tasks-small
队列的未完成任务)
- 有两个PULL队列
medium-tasks
和large-tasks
) - 有调度程序(PUSH )队列的并发性为1(这样只有一个调度任务在任何时候运行)。派遣任务以多种方式创建:
- 通过一分钟的cron工作
- 在添加介质后/大任务添加到推送队列
- 工作任务完成后
- PUSH)队列的并发数等于工作人员数量
和工作流程: - 小任务直接添加到工作队列
- 调度程序任务在触发时执行以下操作:
- 通过查看工作队列中正在运行的任务数量来估计免费工作人员的数量
- 从中等/大型任务PULL队列中取出一个任务,并将其排入工人(或者更确切地说:将其添加到工人PUSH队列中,这将导致其被执行 - 最终 - 在工人上)。
- 我有多个PULL队列,每个队列有一个优先级
- / li>
- 处理程序生成一个随机数并执行一个简单的如果小于0.6,则先尝试小队列,然后是大队列,否则反之亦然(大小很小)
>
- 如果工作人员没有得到任务或错误,他们会进行半随机指数回退,直至达到最大超时值(即。他们开始每1秒钟拉动一次,并在每次拉空后将接近两倍的时间延长至30秒)
- / li>
- 两个PULL队列(中等任务和大
- 一个并发性为1的调度程序(PUSH)队列
- 一个工作(PUSH)队列,其并发性等于工作人员
- 由于最终一致性,任务可见性有一些延迟(即,调度员任务有时看不到任务拉队列,即使它们插在一起) - 我通过向调度程序任务添加5秒倒计时,并添加一个cron作业,每分钟添加一个调度程序任务(因此,如果原始调度程序任务不 拉队列中的任务,其他人将在后面出现)
- 确保命名每个任务以消除重复分派它们的可能性
- 您无法从PULL队列中租用0个项目: - )
- 批处理操作具有上限,因此您必须对批处理任务队列调用执行自己的批处理。 >
- 似乎没有办法以编程方式获得队列的最大并行性值,所以我必须在调度程序中对其进行硬编码(以计算它可以执行多少任务计划)
- 请勿广告d调度员任务(如果他们已经在队列中至少有10个)
- I have a series of "tasks" which I would like to execute
- I have a fixed number of workers to execute these workers (since they call an external API using urlfetch and the number of parallel calls to this API is limited)
- I would like for these "tasks" to be executed "as soon as possible" (ie. minimum latency)
- These tasks are parts of larger tasks and can be categorized based on the size of the original task (ie. a small original task might generate 1 to 100 tasks, a medium one 100 to 1000 and a large one over 1000).
- create three push queues:
tasks-small
,tasks-medium
,tasks-large
- set a maximum number of concurrent request for each such that the total is the maximum number of concurrent API calls (for example if the max. no. concurrent API calls is 200, I could set up
tasks-small
to have amax_concurrent_requests
of 30,tasks-medium
60 andtasks-large
100) - when enqueueing a task, check the no. pending task in each queue (using something like the QueueStatistics class), and, if an other queue is not 100% utilized, enqueue the task there, otherwise just enqueue the task on the queue with the corresponding size.
- have two PULL queues (
medium-tasks
andlarge-tasks
) - have a dispatcher (PUSH) queue with a concurrency of 1 (so that only one dispatch task runs at any time). Dispatch tasks are created in multiple ways:
- by a once-a-minute cron job
- after adding a medium/large task to the push queues
- after a worker task finishes
- have a worker (PUSH) queue with a concurrency equal to the number of workers
- small tasks are added directly to the worker queue
- the dispatcher task, whenever it is triggered, does the following:
- estimates the number of free workers (by looking at the number of running tasks in the worker queue)
- for any "free" slots it takes a task from the medium/large tasks PULL queue and enqueues it on a worker (or more precisely: adds it to the worker PUSH queue which will result in it being executed - eventually - on a worker).
- I have multiple PULL queues, one for each priority
- Many workers pull on an endpoint (handler)
- The handler generates a random number and does a simple "if less than 0.6, try first the small queue and then the large queue, else vice-versa (large then small)"
- If the workers get no tasks or an error, they do semi-random exponential backoff up to maximum timeout (ie. they start pulling every 1 second and approximately double the timeout after each empty pull up to 30 seconds)
- two PULL queues (medium-tasks and large-tasks)
- a dispatcher (PUSH) queue with a concurrency of 1
- a worker (PUSH) queue with a concurrency equal to the number of workers
- there is some delay in task visibility due to eventual consistency (ie. the dispatchers tasks sometimes don't see the tasks from the pull queue even if they are inserted together) - I worked around by adding a countdown of 5 seconds to the dispatcher tasks and also adding a cron job that adds a dispatcher task every minute (so if the original dispatcher task doesn't "see" the task from the pull queue, an other will come along later)
- made sure to name every task to eliminate the possibility of double-dispatching them
- you can't lease 0 items from the PULL queues :-)
- batch operations have an upper limit, so you have to do your own batching over the batch taskqueue calls
- there doesn't seem to be a way to programatically get the "maximum parallelism" value for a queue, so I had to hard-code that in the dispatcher (to calculate how many more tasks it can schedule)
- don't add dispatcher tasks if they are already some (at least 10) in the queue
一旦执行此操作, y已测试。
编辑:我现在迁移到一个更简单的解决方案,类似于@ eric-simonton描述的:
最后一点是必需的 - 其他原因 - 因为从PULL队列中拉/秒的数量限制为10k / s: https://cloud.google.com/appengine/docs/python/taskqueue/overview-pull#Python_Leasing_tasks
我实现了UPDATE中描述的解决方案: b
$ b
查看问题了解更多详情。一些注意事项:
I'm trying to solve the following problem:
The tricky part: I would like to do all this efficiently (ie. minimum latency and use as many parallel API calls as possible - without getting over the limit), but at the same time try to prevent a large number of tasks generated from "large" original tasks to delay the tasks generated from "small" original tasks.
To put it an other way: I would like to have a "priority" assigned to each task with "small" tasks having a higher priority and thus prevent starvation from "large" tasks.
Some searching around doesn't seem to indicate that anything pre-made is available, so I came up with the following:
For example, if we have task T1
which is part of a small task, first check if tasks-small
has free "slots" and enqueue it there. Otherwise check tasks-medium
and tasks-large
. If none of them have free slots, enqueue it on tasks-small
anyway and it will be processed after the tasks added before it are processed (note: this is not optimal because if "slots" free up on the other queues, they still won't process pending tasks from the tasks-small
queue)
An other option would be to use PULL queue and have a central "coordinator" pull from that queue based on priorities and dispatch them, however that seems to add a little more latency.
However this seems a little bit hackish and I'm wondering if there are better alternatives out there.
EDIT: after some thoughts and feedback I'm thinking of using PULL queue after all in the following way:
And the workflow:
I'll report back once this is implemented and at least moderately tested.
EDIT: I now migrated to a simpler solution, similar to what @eric-simonton described:
This final point is needed - amongst other reasons - because the number of pulls / second from a PULL queue is limited to 10k/s: https://cloud.google.com/appengine/docs/python/taskqueue/overview-pull#Python_Leasing_tasks
I implemented the solution described in the UPDATE:
See the question for more details. Some notes:
这篇关于在Google应用引擎上使用任务队列时,如何优先处理任务?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!