调用celery任务因延迟而挂起并且apply_async [英] Calling celery task hangs for delay and apply_async
问题描述
我创建了一个具有以下目录结构的celery应用程序(如celery网站中所述):
I have created a celery app with following directory structure (as given in celery site):
proj
|-- celery.py
|-- celery.pyc
|-- __init__.py
|-- __init__.pyc
|-- tasks.py
`-- tasks.pyc
以下是celery.py的内容
Following are contents of celery.py
from __future__ import absolute_import
from celery import Celery
app = Celery('proj',
broker='amqp://rabbitmquser:<my_passowrd>@localhost:5672/localvhost',
#backend='amqp://',
include=['proj.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=3600,
)
if __name__ == '__main__':
app.start()
以下是task.py的内容
Following is the content of tasks.py
from __future__ import absolute_import
from proj.celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
现在我要使用以下命令启动celery worker:
Now I am starting celery worker with following command:
celery -A proj worker -l debug
我认为工作程序运行正常,因为它在以下输出:
I think worker is running fine as it outputs following on:
[2014-06-12 21:25:02,326: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2014-06-12 21:25:02,328: DEBUG/MainProcess] | Worker: Building graph...
[2014-06-12 21:25:02,328: DEBUG/MainProcess] | Worker: New boot order: {Timer, Hub, Queues (intra), Pool, Autoscaler, Beat, Autoreloader, StateDB, Consumer}
[2014-06-12 21:25:02,331: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2014-06-12 21:25:02,331: DEBUG/MainProcess] | Consumer: Building graph...
[2014-06-12 21:25:02,334: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Mingle, Tasks, Control, Agent, Heart, Gossip, event loop}
[2014-06-12 21:25:02,335: WARNING/MainProcess] /home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/apps/worker.py:161: CDeprecationWarning:
Starting from version 3.2 Celery will refuse to accept pickle by default.
The pickle serializer is a security concern as it may give attackers
the ability to execute any command. It's important to secure
your broker from unauthorized access when using pickle, so we think
that enabling pickle should require a deliberate action and not be
the default choice.
If you depend on pickle then you should set a setting to disable this
warning and to be sure that everything will continue working
when you upgrade to Celery 3.2::
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
You must only enable the serializers that you will actually use.
warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))
-------------- celery@ansumanb-u12 v3.1.12 (Cipater)
---- **** -----
--- * *** * -- Linux-3.5.0-25-generic-x86_64-with-Ubuntu-12.04-precise
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: proj:0x1f46690
- ** ---------- .> transport: amqp://rabbitmquser:**@localhost:5672/localvhost
- ** ---------- .> results: disabled
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. celery.backend_cleanup
. celery.chain
. celery.chord
. celery.chord_unlock
. celery.chunks
. celery.group
. celery.map
. celery.starmap
. proj.tasks.add
. proj.tasks.mul
. proj.tasks.xsum
[2014-06-12 21:25:02,336: DEBUG/MainProcess] | Worker: Starting Hub
[2014-06-12 21:25:02,336: DEBUG/MainProcess] ^-- substep ok
[2014-06-12 21:25:02,336: DEBUG/MainProcess] | Worker: Starting Pool
[2014-06-12 21:25:02,344: DEBUG/MainProcess] ^-- substep ok
[2014-06-12 21:25:02,345: DEBUG/MainProcess] | Worker: Starting Consumer
[2014-06-12 21:25:02,345: DEBUG/MainProcess] | Consumer: Starting Connection
运行完工作程序后,我从python解释器打开终端并执行以下操作:
After running the worker I am opening the terminal and from python interpreter and executing following:
>>> from proj.tasks import add
>>> add(2,2)
4
>>> add.delay(2,3)
此处延迟挂起(apply_async的情况与此相同).当我通过Ctrl + C停止它时,我会得到以下提示:
Here the delay hangs (same story for apply_async). When I am stopping it by Ctrl+C I am getting following:
^CTraceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/task.py", line 453, in delay
return self.apply_async(args, kwargs)
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/task.py", line 555, in apply_async
**dict(self._get_exec_options(), **options)
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/base.py", line 352, in send_task
reply_to=reply_to or self.oid, **options
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/celery/app/amqp.py", line 305, in publish_task
**kwargs
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 168, in publish
routing_key, mandatory, immediate, exchange, declare)
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 436, in _ensured
return fun(*args, **kwargs)
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 173, in _publish
channel = self.channel
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 190, in _get_channel
channel = self._channel = channel()
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/utils/__init__.py", line 422, in __call__
value = self.__value__ = self.__contract__()
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/messaging.py", line 205, in <lambda>
channel = ChannelPromise(lambda: connection.default_channel)
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 756, in default_channel
self.connection
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 741, in connection
self._connection = self._establish_connection()
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/connection.py", line 696, in _establish_connection
conn = self.transport.establish_connection()
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 112, in establish_connection
conn = self.Connection(**opts)
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/connection.py", line 171, in __init__
(10, 10), # start
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 67, in wait
self.channel_id, allowed_methods)
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/connection.py", line 237, in _wait_method
self.method_reader.read_method()
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 186, in read_method
self._next_method()
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 107, in _next_method
frame_type, channel, payload = read_frame()
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/transport.py", line 153, in read_frame
frame_type, channel, size = unpack('>BHI', read(7, True))
File "/home/ansumanb/.virtualenvs/celery_venv/local/lib/python2.7/site-packages/amqp/transport.py", line 272, in _read
s = recv(n - len(rbuf))
KeyboardInterrupt
任何建议或评论将不胜感激.我通过其他链接讨论了/var目录大小,但我认为我有足够的空间.
Any suggestion or comment will be much appreciated. I have gone through other links where they talk about /var directory size but I think I have enough space.
df -h的结果
Filesystem Size Used Avail Use% Mounted on
/dev/sda3 283G 99G 170G 37% /
udev 1.9G 4.0K 1.9G 1% /dev
tmpfs 388M 1.1M 387M 1% /run
none 5.0M 0 5.0M 0% /run/lock
none 1.9G 28M 1.9G 2% /run/shm
以下是Rabbitmqctl状态的结果
following is the result of rabbitmqctl status
Status of node 'rabbit@ansumanb-u12' ...
[{pid,12014},
{running_applications,[{rabbit,"RabbitMQ","3.3.2"},
{os_mon,"CPO CXC 138 46","2.2.7"},
{xmerl,"XML parser","1.2.10"},
{mnesia,"MNESIA CXC 138 12","4.5"},
{sasl,"SASL CXC 138 11","2.1.10"},
{stdlib,"ERTS CXC 138 10","1.17.5"},
{kernel,"ERTS CXC 138 10","2.14.5"}]},
{os,{unix,linux}},
{erlang_version,"Erlang R14B04 (erts-5.8.5) [source] [64-bit] [smp:4:4] [rq:4] [async-threads:30] [kernel-poll:true]\n"},
{memory,[{total,27919080},
{connection_procs,2704},
{queue_procs,5408},
{plugins,0},
{other_proc,9099992},
{mnesia,63776},
{mgmt_db,0},
{msg_index,34080},
{other_ets,784160},
{binary,12144},
{code,14685283},
{atom,1367393},
{other_system,1864140}]},
{alarms,[]},
{listeners,[{clustering,25672,"::"},{amqp,5672,"::"}]},
{vm_memory_high_watermark,0.4},
{vm_memory_limit,1625165004},
{disk_free_limit,50000000},
{disk_free,181684699136},
{file_descriptors,[{total_limit,2},
{total_used,0},
{sockets_limit,0},
{sockets_used,0}]},
{processes,[{limit,1048576},{used,127}]},
{run_queue,0},
{uptime,20072}]
...done.
我已经检查了rabbitmq日志,但那里什么也没得到.Celery版本是3.1.12.
I've checked the rabbitmq logs and didn't get anything there. Celery version is 3.1.12.
我已经使用以下命令创建了rabbitmq虚拟主机和用户
I have created rabbitmq virtual host and user with following commands
$ sudo rabbitmqctl add_user rabbitmquser <mypassword>
$ sudo rabbitmqctl add_vhost localvhost
$ sudo rabbitmqctl set_permissions -p localvhost rabbitmquser ".*" ".*" ".*"
谢谢
推荐答案
我犯了一个很大的错误.我的问题是我试图更改一些我不知道的东西.我正在添加它以供其他参考.
I did a great mistake. My problem was I tried to change something about which I didn't know. I am adding it for others reference.
在安装Rabbitmq时,我在/etc/default/rabbitmq-server
上将 ulimit
的默认值从1024更改为100.
While installing rabbitmq I changed the default value of ulimit
from 1024 to 100 at /etc/default/rabbitmq-server
.
我将该值改回了1024,现在该问题已解决.
I changed the value back to 1024 and the issue is fixed now.
这篇关于调用celery任务因延迟而挂起并且apply_async的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!