芹菜为每个任务创建一个新的连接 [英] Celery creating a new connection for each task

查看:96
本文介绍了芹菜为每个任务创建一个新的连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Redis的Celery运行一些后台任务,但是每次调用任务时,都会创建一个与Redis的新连接。我在Heroku和我的Redis Go Go计划允许10个连接。我很快就达到了这个限制,并获得了客户端数量达到了的错误。



如何确保Celery在单个连接上排队任务,而不是打开每次新的一个?



编辑 - 包括完整的追溯

 文件/app/.heroku/venv/lib/python2.7/site-packages/django/core/handlers/base.py,第111行,get_response 
response =回调(请求,* callback_args,** callback_kwargs)

文件/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/object_wrapper .py,第166行,__call__
self._nr_instance,args,kwargs)

文件/app/.heroku/venv/lib/python2.7/site-packages/newrelic -1.4.0.137 / newrelic / hooks / framework_django.py,第447行,包装器
return wrapped(* args,** kwargs)

文件/app/.heroku/venv /lib/python2.7/site-packages/django/views/decorators/csrf.py,第77行,wrapped_view
return view_func (* args,** kwargs)

文件/app/feedback/views.py,第264行,在zencoder_webhook_handler
tasks.process_zencoder_notification.delay(webhook)

文件/app/.heroku/venv/lib/python2.7/site-packages/celery/app/task.py,第343行,延迟
return self.apply_async(args,kwargs)

文件/app/.heroku/venv/lib/python2.7/site-packages/celery/app/task.py,第458行,apply_async
中与app.producer_or_acquire (生产者)为P:

文件/usr/local/lib/python2.7/contextlib.py,第17行,__enter__
return self.gen.next()

文件/app/.heroku/venv/lib/python2.7/site-packages/celery/app/base.py,第247行,在producer_or_acquire
with self.amqp。 producer_pool.acquire(block = True)作为生产者:

文件/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py,第705行,获取
R = self.prepare(R)

文件/app/.heroku/venv/lib/python2.7/site-packages/kombu/pools.py,第54行,在p repare
p = p()

文件/app/.heroku/venv/lib/python2.7/site-packages/kombu/pools.py,第45行,拉姆达>
return lambda:self.create_producer()

文件/app/.heroku/venv/lib/python2.7/site-packages/kombu/pools.py,第42行,在create_producer
return self.Producer(self._acquire_connection())

文件/app/.heroku/venv/lib/python2.7/site-packages/celery/app/amqp .py,第160行,__init__
super(TaskProducer,self).__ init __(channel,exchange,* args,** kwargs)

文件/app/.heroku/venv /lib/python2.7/site-packages/kombu/messaging.py,第83行,__init__
self.revive(self.channel)

文件/app/.heroku /venv/lib/python2.7/site-packages/kombu/messaging.py,第174行,恢复
channel = self.channel = maybe_channel(channel)

文件/ app / .heroku / venv / lib / python2.7 / site-packages / kombu / connection.py,第879行,在maybe_channel
返回channel.default_channel

文件/ app / .heroku / venv / lib / python2.7 / site-packages / kombu / connection.py,第617行,default_channel
self.connection

文件/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py,第610行,连接
self._connection = self._establish_connection()

文件/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py,第569行,在_establish_connection中
conn = self.transport.establish_connection()

文件/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/virtual/__init__.py,第722行,建立连接
self._avail_channels。 append(self.create_channel(self))

文件/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/virtual/__init__.py,第705行,在create_channel
channel = self.Channel(connection)

文件/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/redis.py ,第271行,__init__
self.client.info()

文件/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4。 0.137 / newrelic / api / object_wrapper.py,第166行,__call__
self._nr_ins tance,args,kwargs)

文件/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/function_trace.py,行81,in literal_wrapper
return wrapped(* args,** kwargs)

文件/app/.heroku/venv/lib/python2.7/site-packages/redis/client。 py,第344行,在info
return self.execute_command('INFO')

文件/app/.heroku/venv/lib/python2.7/site-packages/kombu /transport/redis.py,第536行,在execute_command
conn.send_command(* args)

文件/app/.heroku/venv/lib/python2.7/site- package / redis / connection.py,第273行,在send_command
self.send_packed_command(self.pack_command(* args))

文件/app/.heroku/venv/lib/ python2.7 / site-packages / redis / connection.py,第256行,在send_packed_command
self.connect()

文件/app/.heroku/venv/lib/python2 .7 / site-packages / newrelic-1.4.0.137 / newrelic / api / object_wrapper.py,第166行,__call__
self._nr_instance,args,kwargs)

文件/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/function_trace.py,第81行,literal_wrapper
返回包装(* args,** kwargs)

文件/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py,第207行,连接
self.on_connect()

文件/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py,第233行,on_connect
如果self.read_response()!='OK':

文件/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py,行283,在read_response
raise响应

ResponseError:达到的客户端的最大数量


解决方案

我希望我使用Redis,因为有一个特定的选项来限制连接数量: CELERY_REDIS_MAX_CONNECTIONS





MongoDB 有一个类似的后端设置。



鉴于这些后端设置,我不知道什么是 BROKER_POOL_LIMIT 实际上。希望 CELERY_REDIS_MAX_CONNECTIONS 解决您的问题。



我是使用CloudAMQP的人之一,AMQP后端不有自己的连接极限参数。


I'm using Celery with Redis to run some background tasks, but each time a task is called, it creates a new connection to Redis. I'm on Heroku and my Redis to Go plan allows for 10 connections. I'm quickly hitting that limit and getting a "max number of clients reached" error.

How can I ensure that Celery queues the tasks on a single connection rather than opening a new one each time?

EDIT - including the full traceback

File "/app/.heroku/venv/lib/python2.7/site-packages/django/core/handlers/base.py", line 111, in get_response
   response = callback(request, *callback_args, **callback_kwargs)

 File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/object_wrapper.py", line 166, in __call__
   self._nr_instance, args, kwargs)

 File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/hooks/framework_django.py", line 447, in wrapper
   return wrapped(*args, **kwargs)

 File "/app/.heroku/venv/lib/python2.7/site-packages/django/views/decorators/csrf.py", line 77, in wrapped_view
   return view_func(*args, **kwargs)

 File "/app/feedback/views.py", line 264, in zencoder_webhook_handler
   tasks.process_zencoder_notification.delay(webhook)

 File "/app/.heroku/venv/lib/python2.7/site-packages/celery/app/task.py", line 343, in delay
   return self.apply_async(args, kwargs)

 File "/app/.heroku/venv/lib/python2.7/site-packages/celery/app/task.py", line 458, in apply_async
   with app.producer_or_acquire(producer) as P:

 File "/usr/local/lib/python2.7/contextlib.py", line 17, in __enter__
   return self.gen.next()

 File "/app/.heroku/venv/lib/python2.7/site-packages/celery/app/base.py", line 247, in producer_or_acquire
   with self.amqp.producer_pool.acquire(block=True) as producer:

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 705, in acquire
   R = self.prepare(R)

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/pools.py", line 54, in prepare
   p = p()

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/pools.py", line 45, in <lambda>
   return lambda: self.create_producer()

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/pools.py", line 42, in create_producer
   return self.Producer(self._acquire_connection())

 File "/app/.heroku/venv/lib/python2.7/site-packages/celery/app/amqp.py", line 160, in __init__
   super(TaskProducer, self).__init__(channel, exchange, *args, **kwargs)

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/messaging.py", line 83, in __init__
   self.revive(self.channel)

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/messaging.py", line 174, in revive
   channel = self.channel = maybe_channel(channel)

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 879, in maybe_channel
   return channel.default_channel

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 617, in default_channel
   self.connection

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 610, in connection
   self._connection = self._establish_connection()

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/connection.py", line 569, in _establish_connection
   conn = self.transport.establish_connection()

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/virtual/__init__.py", line 722, in establish_connection
   self._avail_channels.append(self.create_channel(self))

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/virtual/__init__.py", line 705, in create_channel
   channel = self.Channel(connection)

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/redis.py", line 271, in __init__
   self.client.info()

 File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/object_wrapper.py", line 166, in __call__
   self._nr_instance, args, kwargs)

 File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/function_trace.py", line 81, in literal_wrapper
   return wrapped(*args, **kwargs)

 File "/app/.heroku/venv/lib/python2.7/site-packages/redis/client.py", line 344, in info
   return self.execute_command('INFO')

 File "/app/.heroku/venv/lib/python2.7/site-packages/kombu/transport/redis.py", line 536, in execute_command
   conn.send_command(*args)

 File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 273, in send_command
   self.send_packed_command(self.pack_command(*args))

 File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 256, in send_packed_command
   self.connect()

 File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/object_wrapper.py", line 166, in __call__
   self._nr_instance, args, kwargs)

 File "/app/.heroku/venv/lib/python2.7/site-packages/newrelic-1.4.0.137/newrelic/api/function_trace.py", line 81, in literal_wrapper
   return wrapped(*args, **kwargs)

 File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 207, in connect
   self.on_connect()

 File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 233, in on_connect
   if self.read_response() != 'OK':

 File "/app/.heroku/venv/lib/python2.7/site-packages/redis/connection.py", line 283, in read_response
   raise response

ResponseError: max number of clients reached

解决方案

I wish I was using Redis, because there is a specific option to limit the number of connections: CELERY_REDIS_MAX_CONNECTIONS.

The MongoDB has a similar backend setting.

Given these backend settings, I have no idea what BROKER_POOL_LIMIT actually does. Hopefully CELERY_REDIS_MAX_CONNECTIONS solves your problem.

I'm one of those folks using CloudAMQP, and the AMQP backend does not have its own connection limit parameter.

这篇关于芹菜为每个任务创建一个新的连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆