Celery:具有相同参数的任务的速率限制 [英] Celery: Rate limit on tasks with the same parameters

查看:72
本文介绍了Celery:具有相同参数的任务的速率限制的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在寻找一种限制何时调用函数的方法,但仅限于输入参数不同的情况,即:

I am looking for a way to restrict when a function is called, but only when the input parameters are different, that is:

@app.task(rate_limit="60/s")
def api_call(user):
   do_the_api_call()

for i in range(0,100):
  api_call("antoine")
  api_call("oscar")

因此,我希望每秒 60次 api_call("oscar") 被调用 api_call("antoine")每秒> 60次.

So I would like api_call("antoine") to be called 60 times per second and api_call("oscar") 60 times per second as well.

我该怎么做?

-编辑27/04/2015我尝试用一​​个任务中的rate_limit调用一个子任务,但是它也不起作用:rate_limit始终应用于所有实例化的子任务或任务(这是逻辑上的).

--EDIT 27/04/2015 I have tried calling a subtask with rate_limit within a task, but it does not work either: The rate_limit is always applied for all the instantiated subtasks or tasks (which is logical).

@app.task(rate_limit="60/s")
def sub_api_call(user):
   do_the_api_call()

@app.task
def api_call(user):
  sub_api_call(user)

for i in range(0,100):
  api_call("antoine")
  api_call("oscar")

最好!

推荐答案

我今天花了一些时间,提出了一个不错的解决方案.所有其他解决方案都存在以下问题之一:

I spent some time on this today and came up with a nice solution. All the other solutions to this have one of the following problems:

  • 他们要求任务进行无限次重试,从而使芹菜的重试机制失效.
  • 它们不会根据参数进行调节
  • 由于多个工作人员或队列而失败
  • 它们笨拙,等等.

基本上,您可以像这样包装任务:

Basically, you wrap your task like this:

@app.task(bind=True, max_retries=10)
@throttle_task("2/s", key="domain", jitter=(2, 15))
def scrape_domain(self, domain):
    do_stuff()

其结果是您将任务限制为每个域参数每秒运行2次,并在2-15秒之间随机重试抖动. key 参数是可选的,但与您任务中的参数相对应.如果没有给出关键参数,它将只是把任务限制在给定的速率上.如果提供了此功能,则节流将应用于(任务,键)二元组.

And the result is that you'll throttle the task to 2 runs per second per domain parameter, with a random retry jitter between 2-15s. The key parameter is optional, but corresponds to a parameter in your task. If the key parameter is not given, it'll just throttle the task to the rate given. If it is provided, then the throttle will apply to the (task, key) dyad.

查看此问题的另一种方法是不使用装饰器.这样可以提供更多的灵活性,但是您可以依靠自己进行重试.除了上述方法,您可以执行以下操作:

The other way to look at this is without the decorator. This gives a little more flexibility, but counts on you to do the retrying yourself. Instead of the above, you could do:

@app.task(bind=True, max_retries=10)
def scrape_domain(self, domain):
    proceed = is_rate_okay(self, "2/s", key=domain)
    if proceed:
        do_stuff()
    else:
        self.request.retries = task.request.retries - 1  # Don't count this as against max_retries.
        return task.retry(countdown=random.uniform(2, 15))

我认为这与第一个示例相同.更长一点,更分支,但是更清楚地显示了它的工作方式.我希望自己总是使用装饰器.

I think that's identical to the first example. A bit longer, and more branchy, but shows how it works a bit more clearly. I'm hoping to always use the decorator, myself.

这一切都通过在redis中保持计数来实现.实现非常简单.您在任务的redis中创建一个密钥(以及key参数,如果给定的话),然后根据提供的时间表使redis密钥失效.如果用户将速率设置为10/m,则将redis键设置为60s,并在每次尝试使用正确名称的任务时将其递增.如果增量器太高,请重试该任务.否则,请运行它.

This all works by keeping a tally in redis. The implementation is very simple. You create a key in redis for the task (and the key parameter, if given), and you expire the redis key according to the schedule provided. If the user sets a rate of 10/m, you make a redis key for 60s, and you increment it every time a task with the correct name is attempted. If your incrementor gets too high, retry the task. Otherwise, run it.

def parse_rate(rate: str) -> Tuple[int, int]:
    """

    Given the request rate string, return a two tuple of:
    <allowed number of requests>, <period of time in seconds>

    (Stolen from Django Rest Framework.)
    """
    num, period = rate.split("/")
    num_requests = int(num)
    if len(period) > 1:
        # It takes the form of a 5d, or 10s, or whatever
        duration_multiplier = int(period[0:-1])
        duration_unit = period[-1]
    else:
        duration_multiplier = 1
        duration_unit = period[-1]
    duration_base = {"s": 1, "m": 60, "h": 3600, "d": 86400}[duration_unit]
    duration = duration_base * duration_multiplier
    return num_requests, duration


def throttle_task(
    rate: str,
    jitter: Tuple[float, float] = (1, 10),
    key: Any = None,
) -> Callable:
    """A decorator for throttling tasks to a given rate.

    :param rate: The maximum rate that you want your task to run. Takes the
    form of '1/m', or '10/2h' or similar.
    :param jitter: A tuple of the range of backoff times you want for throttled
    tasks. If the task is throttled, it will wait a random amount of time
    between these values before being tried again.
    :param key: An argument name whose value should be used as part of the
    throttle key in redis. This allows you to create per-argument throttles by
    simply passing the name of the argument you wish to key on.
    :return: The decorated function
    """

    def decorator_func(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            # Inspect the decorated function's parameters to get the task
            # itself and the value of the parameter referenced by key.
            sig = inspect.signature(func)
            bound_args = sig.bind(*args, **kwargs)
            task = bound_args.arguments["self"]
            key_value = None
            if key:
                try:
                    key_value = bound_args.arguments[key]
                except KeyError:
                    raise KeyError(
                        f"Unknown parameter '{key}' in throttle_task "
                        f"decorator of function {task.name}. "
                        f"`key` parameter must match a parameter "
                        f"name from function signature: '{sig}'"
                    )
            proceed = is_rate_okay(task, rate, key=key_value)
            if not proceed:
                logger.info(
                    "Throttling task %s (%s) via decorator.",
                    task.name,
                    task.request.id,
                )
                # Decrement the number of times the task has retried. If you
                # fail to do this, it gets auto-incremented, and you'll expend
                # retries during the backoff.
                task.request.retries = task.request.retries - 1
                return task.retry(countdown=random.uniform(*jitter))
            else:
                # All set. Run the task.
                return func(*args, **kwargs)

        return wrapper

    return decorator_func


def is_rate_okay(task: Task, rate: str = "1/s", key=None) -> bool:
    """Keep a global throttle for tasks

    Can be used via the `throttle_task` decorator above.

    This implements the timestamp-based algorithm detailed here:

        https://www.figma.com/blog/an-alternative-approach-to-rate-limiting/

    Basically, you keep track of the number of requests and use the key
    expiration as a reset of the counter.

    So you have a rate of 5/m, and your first task comes in. You create a key:

        celery_throttle:task_name = 1
        celery_throttle:task_name.expires = 60

    Another task comes in a few seconds later:

        celery_throttle:task_name = 2
        Do not update the ttl, it now has 58s remaining

    And so forth, until:

        celery_throttle:task_name = 6
        (10s remaining)

    We're over the threshold. Re-queue the task for later. 10s later:

        Key expires b/c no more ttl.

    Another task comes in:

        celery_throttle:task_name = 1
        celery_throttle:task_name.expires = 60

    And so forth.

    :param task: The task that is being checked
    :param rate: How many times the task can be run during the time period.
    Something like, 1/s, 2/h or similar.
    :param key: If given, add this to the key placed in Redis for the item.
    Typically, this will correspond to the value of an argument passed to the
    throttled task.
    :return: Whether the task should be throttled or not.
    """
    key = f"celery_throttle:{task.name}{':' + str(key) if key else ''}"

    r = make_redis_interface("CACHE")

    num_tasks, duration = parse_rate(rate)

    # Check the count in redis
    count = r.get(key)
    if count is None:
        # No key. Set the value to 1 and set the ttl of the key.
        r.set(key, 1)
        r.expire(key, duration)
        return True
    else:
        # Key found. Check it.
        if int(count) <= num_tasks:
            # We're OK, run it.
            r.incr(key, 1)
            return True
        else:
            return False

这篇关于Celery:具有相同参数的任务的速率限制的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆