如何限制创建芹菜任务的脚本快于消耗它们的速度? [英] How to throttle script that creates celery tasks faster than they're consumed?

查看:28
本文介绍了如何限制创建芹菜任务的脚本快于消耗它们的速度?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个脚本,可以生成数百万个Celery任务,数据库中的每一行一个.有没有办法阻止它完全淹没芹菜?

理想情况下,我想让Celery保持忙碌,但是我不希望Celery队列的长度超过几十个任务,因为那只是浪费内存(尤其是因为如果没有某种限制,脚本将添加数百万个任务几乎立即排入队列).

解决方案

在过去的几天里,我花了一些时间解决这个问题,并提出了我所说的 CeleryThrottle 对象.基本上,您告诉它队列中要多少个项目,并且尽力将队列保持在该大小和2×该大小之间.

所以这是代码(假设Redis代理,但是很容易更改):

 #编码= utf-8来自收藏品进口双端队列导入时间导入redis从django.conf导入设置现在从django.utils.timezone导入def get_queue_length(queue_name ='celery'):"获取芹菜队列中的任务数.:param queue_name:要检查的队列的名称.:return:队列中的项目数."r = redis.StrictRedis(host = settings.REDIS_HOST,port = settings.REDIS_PORT,db = settings.REDIS_DATABASES ['CELERY'],)返回r.llen(queue_name)CeleryThrottle类(对象):"节制芹菜的课程.""def __init __(self,min_items = 100,queue_name ='celery'):"创建一个节流阀,以防止芹菜失控.:param min_items:应该入队的最小项目数.最多可以创建此数字的2倍.这个最小值不是有保证的,因此这个数字比您的最大并发数略高应该使用.请注意,该数字包括所有任务,除非您使用一个特定的队列供您处理."self.min = min_itemsself.max = self.min * 2#用于跟踪队列和等待率的变量self.last_processed_count = 0self.count_to_do = self.maxself.last_measurement =无self.first_run =真#使用固定长度的队列保留最后N个速率self.rates = deque(maxlen = 15)self.avg_rate = self._calculate_avg()#用于检查self.queue_name =队列名称def _calculate_avg():返回float(sum(self.rates))/(len(self.rates)或1)def _add_latest_rate(self):"计算队列正在处理项目的速率.""right_now = now()elapsed_seconds =(right_now-self.last_measurement).total_seconds()self.rates.append(self.last_processed_count/elapsed_seconds)self.last_measurement = right_nowself.last_processed_count = 0self.avg_rate = self._calculate_avg()def may_wait():"停止调用函数或让它继续进行,具体取决于队列.这里的想法是不经常检查队列的长度在保持队列中的项目数尽可能近的同时实现在self.min和self.max之间.我们通过立即加入self.max项目来做到这一点.之后,我们监视队列以确定处理项目的速度.使用达到这个速度后,我们将等待适当的时间或立即按下."self.last_processed_count + = 1如果self.count_to_do>0:# 不要等.允许过程继续.如果self.first_run:self.first_run =假self.last_measurement = now()self.count_to_do-= 1返回self._add_latest_rate()task_count = get_queue_length(self.queue_name)如果task_count>self.min:#估计多余的时间需要多长时间才能完成#long + 5%,以确保我们在下一次迭代中均低于self.min.剩余任务数=任务数-self.minwait_time =(surplus_task_count/self.avg_rate)* 1.05time.sleep(wait_time)#假设由于等待,我们低于self.min;最多排队.如果task_count<self.max:self.count_to_do = self.max-self.min返回elif task_count< = self.min:#添加更多项目.self.count_to_do = self.max-task_count返回 

用法如下:

  throttle = CeleryThrottle()对于true_big_list_of_items中的项目:油门.maybe_wait()my_task.delay(项目) 

非常简单,希望非常灵活.将其放置在适当的位置,代码将监视您的队列,如果队列太长,则将等待添加到循环中.如果有更新,请在我们的github存储库

这样做,它将跟踪任务的滚动平均速度,并且将尝试不更频繁地检查队列长度.例如,如果每个任务需要花费两分钟来运行,则在将100个项目放入队列后,它可能需要等待很长时间才能再次检查队列的长度.此脚本的简单版本可以在每次循环中检查队列长度,但这会增加不必要的延迟.这个版本试图以一些错误为代价(在这种情况下,队列低于 min_items ).

I have a script that generates millions of Celery tasks, one per row in the DB. Is there a way to throttle it so that it doesn't completely flood Celery?

Ideally I want to keep Celery busy, but I don't want the length of the Celery queue to exceed a few dozen tasks since that's just a waste of memory (especially since without some kind of throttle the script will add millions of tasks to the queue almost instantly).

解决方案

I've spent some time on this problem over the past several days and came up with what I'm calling a CeleryThrottle object. Basically, you tell it how many items you want in a queue and it does its best to keep the queue between that size and 2× that size.

So here's the code (assumes Redis broker, but easily changed):

# coding=utf-8
from collections import deque

import time

import redis
from django.conf import settings
from django.utils.timezone import now


def get_queue_length(queue_name='celery'):
    """Get the number of tasks in a celery queue.

    :param queue_name: The name of the queue you want to inspect.
    :return: the number of items in the queue.
    """
    r = redis.StrictRedis(
        host=settings.REDIS_HOST,
        port=settings.REDIS_PORT,
        db=settings.REDIS_DATABASES['CELERY'],
    )
    return r.llen(queue_name)


class CeleryThrottle(object):
    """A class for throttling celery."""

    def __init__(self, min_items=100, queue_name='celery'):
        """Create a throttle to prevent celery run aways.

        :param min_items: The minimum number of items that should be enqueued. 
        A maximum of 2× this number may be created. This minimum value is not 
        guaranteed and so a number slightly higher than your max concurrency 
        should be used. Note that this number includes all tasks unless you use
        a specific queue for your processing.
        """
        self.min = min_items
        self.max = self.min * 2

        # Variables used to track the queue and wait-rate
        self.last_processed_count = 0
        self.count_to_do = self.max
        self.last_measurement = None
        self.first_run = True

        # Use a fixed-length queue to hold last N rates
        self.rates = deque(maxlen=15)
        self.avg_rate = self._calculate_avg()

        # For inspections
        self.queue_name = queue_name

    def _calculate_avg(self):
        return float(sum(self.rates)) / (len(self.rates) or 1)

    def _add_latest_rate(self):
        """Calculate the rate that the queue is processing items."""
        right_now = now()
        elapsed_seconds = (right_now - self.last_measurement).total_seconds()
        self.rates.append(self.last_processed_count / elapsed_seconds)
        self.last_measurement = right_now
        self.last_processed_count = 0
        self.avg_rate = self._calculate_avg()

    def maybe_wait(self):
        """Stall the calling function or let it proceed, depending on the queue.

        The idea here is to check the length of the queue as infrequently as 
        possible while keeping the number of items in the queue as closely 
        between self.min and self.max as possible.

        We do this by immediately enqueueing self.max items. After that, we 
        monitor the queue to determine how quickly it is processing items. Using 
        that rate we wait an appropriate amount of time or immediately press on.
        """
        self.last_processed_count += 1
        if self.count_to_do > 0:
            # Do not wait. Allow process to continue.
            if self.first_run:
                self.first_run = False
                self.last_measurement = now()
            self.count_to_do -= 1
            return

        self._add_latest_rate()
        task_count = get_queue_length(self.queue_name)
        if task_count > self.min:
            # Estimate how long the surplus will take to complete and wait that
            # long + 5% to ensure we're below self.min on next iteration.
            surplus_task_count = task_count - self.min
            wait_time = (surplus_task_count / self.avg_rate) * 1.05
            time.sleep(wait_time)

            # Assume we're below self.min due to waiting; max out the queue.
            if task_count < self.max:
                self.count_to_do = self.max - self.min
            return

        elif task_count <= self.min:
            # Add more items.
            self.count_to_do = self.max - task_count
            return

Usage looks like:

throttle = CeleryThrottle()
for item in really_big_list_of_items:
    throttle.maybe_wait()
    my_task.delay(item)

Pretty simple and hopefully pretty flexible. With that in place, the code will monitor your queue and add waits to your loop if the queue is getting too long. This is in our github repo in case there are updates.

As it does this, it will track the rolling average speed of the task, and will attempt not to check the queue length more frequently than needed. For example, if tasks take two minutes each to run, after putting 100 items in teh queue, it can wait quite a while before having to check the length of the queue again. A simpler version of this script could check the queue length every time through the loop, but that would add unnecessary delay. This version tries to be smart about it at the cost of being sometimes wrong (in which case the queue goes below min_items).

这篇关于如何限制创建芹菜任务的脚本快于消耗它们的速度?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆