Celery &Rabbitmq:WARNING/MainProcess] 收到并删除未知消息.错误的目的地?!?- GIT 上的实验 [英] Celery &Rabbitmq:WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?- a experiment on the GIT

查看:9
本文介绍了Celery &Rabbitmq:WARNING/MainProcess] 收到并删除未知消息.错误的目的地?!?- GIT 上的实验的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

最近,我在做一个GIT项目的实验,以了解大数据处理框架.

Recently , I am doing an experiment on a GIT project to understanding the big data processing framework.

1、GIT项目:https://github.com/esperdyne/celery-message-processing

1、GIT project:https://github.com/esperdyne/celery-message-processing

我们有以下组件:

1、AMPQ broker(RabbitMQ):它作为一个消息缓冲区,作为一个邮箱为不同的用户交换消息!

1、AMPQ broker(RabbitMQ): it works as a message buffer, which works as a mail-box to exchange messages for different user!

2、worker:作为服务服务器为各种服务客户端提供服务.3、Queue(celery":它作为一个多处理容器,用于同时处理各种worker实例.

2、worker: it works as the service-server to provide service for various service client. 3、Queue("celery":it works as a multi-processing container which is used to handle the various worker instances at the same time.

关键配置如下:

我们使用对象proj/celery.py来定义app,定义如下:

We use the object proj/celery.py to define the app, the definition can be seen as below:

app = Celery('proj',
         broker='amqp://',
         backend='redis://localhost',
         include=['proj.tasks'])

在此处输入代码

当我们启动应用程序时:

when we start the app:

1、启动应用时,我们看到了rabbitmq产生的消息,但是celery无法处理这个消息.

1、 when we start the application, we have seen the message which is produced from the rabbitmq, yet the celery could not handle the message.

Parse.log 长这样:[2017-02-04 14:28:06,909: WARNING/MainProcess] 收到并删除未知消息.目的地错误?!?

Parse.log looks like this:[2017-02-04 14:28:06,909: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?

我们有以下问题:

4.2.1 AMQP 机制我们可以看到AMQP作为消息缓冲区工作,那么就会有一个消息发送者和一个消息获取者:

4.2.1 AMQP mechanism We can see that the AMQP works as the message buffer, then there will be a message sender and a message fetcher:

在上图中,谁是消息发送者,谁是消息获取者.

In the above diagram , who is the message sender and who is the message fetcher.

4.2.2 消息定义在我们的应用程序中,我们找不到代码来定义要发送或从 AMQP 接收的消息.

4.2.2 Message definition In our application , we can not find the code to define the Message to send ,or to receive form the AMQP.

4.2.3 消息监控我们如何监控 AMQP 中的消息发送和接收.希望老师指导我们解决问题,并给我们一些详细的说明

4.2.3 Message monitor How can we monitor the Message send and receive in the AMQP. Hope a teacher will guide us to solve the problem , and give us some detailed

芹菜经纪人机制介绍!

注意:可以在这里查看错误日志

note : the error log can be seen here

[2017-02-04 14:28:06,909: WARNING/MainProcess] Received and deleted unknown message.  Wrong destination?!?



 The full contents of the message body was: body: [[u'maildir/allen-       p/inbox/1.'], {}, {u'errbacks': None, u'callbacks': None, u'chord': None, u'chain': [{u'chord_size': None, u'task': u'celery.group', u'args': [], u'immutable': False, u'subtask_type': u'group', u'kwargs': {u'tasks': [{u'chord_size': None, u'task': u'proj.tasks.deploy_db', u'args': [], u'options': {u'reply_to': u'3d9de118-f9d0-3bee-9972-b6a4d4482446', u'task_id': u'3cafda16-3e7c-44db-b05e-1327ef97ffc3'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}, {u'chord_size': None, u'task': u'proj.tasks.deploy_es', u'args': [], u'options': {u'reply_to': u'3d9de118-f9d0-3bee-9972-b6a4d4482446', u'task_id': u'1f4c728b-680d-4dde-98b9-b153d5282780'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}]}, u'options': {u'parent_id': None, u'task_id': u'f21c911e-f2ac-462e-9662-2efbd27bcf91', u'root_id': None}}]}] (801b)
{content_type:'application/json' content_encoding:'utf-8'
  delivery_info:{'consumer_tag': 'None4', 'redelivered': False, 'routing_key': 'parse', 'delivery_tag': 623422L, 'exchange': ''} headers={'xe5xca.xdbx00x00x00x00x00': None, 'P&5x07x00': None, 'T
KBx00x00x00': 'fc8f0bed-665f-4699-89dd-a56fc247ea8b', 'Nxfdx17=x00x00': 'gen17347@centos1', 'xcfbxddR': 'py', '9*xa8': None, 'xb7/bx84x00x00x00': 0, 'xe0x0bxfax89x00x00x00': None, 'xdfRxc4xx00x00x00x00x00': [None, None], 'T3x1d ': 'proj.tasks.parse', 'xaexbf': 'fc8f0bed-665f-4699-89dd-a56fc247ea8b', 'x11sx1fxd8x00x00x00x00': "('maildir/allen-p/inbox/1.',)", 'ULxa1xfcx00x00x00x00x00x00': '{}'}}


[2017-02-04 15:47:22,463: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672//
[2017-02-04 15:47:22,473: INFO/MainProcess] mingle: searching for neighbors
[2017-02-04 15:47:23,503: INFO/MainProcess] mingle: sync with 2 nodes
[2017-02-04 15:47:23,504: INFO/MainProcess] mingle: sync complete
[2017-02-04 15:47:23,530: INFO/MainProcess] parse@centos1 ready.
[2017-02-04 15:47:24,890: INFO/MainProcess] sync with es_deploy@centos1
[2017-02-04 15:47:51,017: WARNING/MainProcess] Received and deleted unknown message.  Wrong destination?!?

The full contents of the message body was: body: [[u'maildir/allen-p/inbox/1.'], {}, {u'errbacks': None, u'callbacks': None, u'chord': None, u'chain': [{u'chord_size': None, u'task': u'celery.group', u'args': [], u'immutable': False, u'subtask_type': u'group', u'kwargs': {u'tasks': [{u'chord_size': None, u'task': u'proj.tasks.deploy_db', u'args': [], u'options': {u'reply_to': u'bd66dd5c-516d-3b51-ab40-c8337a33b18e', u'task_id': u'765e5bbe-198f-405c-b10c-023d35e03981'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}, {u'chord_size': None, u'task': u'proj.tasks.deploy_es', u'args': [], u'options': {u'reply_to': u'bd66dd5c-516d-3b51-ab40-c8337a33b18e', u'task_id': u'7dacb897-d023-40b5-9874-e00b75107bbd'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}]}, u'options': {u'parent_id': None, u'task_id': u'f0d41289-33e2-4c8c-8d84-9d1d4c5a9c80', u'root_id': None}}]}] (801b)
{content_type:'application/json' content_encoding:'utf-8'
  delivery_info:{'consumer_tag': 'None4', 'redelivered': False, 'routing_key': 'parse', 'delivery_tag': 3L, 'exchange': ''} headers={'xe5xca.xdbx00x00x00x00x00': None, 'P&5x07x00': None, 'T
KBx00x00x00': '4d7754ed-0e36-4731-ae99-a84f42b8eba1', 'Nxfdx17=x00x00': 'gen19722@centos1', 'xcfbxddR': 'py', '9*xa8': None, 'xb7/bx84x00x00x00': 0, 'xe0x0bxfax89x00x00x00': None, 'xdfRxc4xx00x00x00x00x00': [None, None], 'T3x1d ': 'proj.tasks.parse', 'xaexbf': '4d7754ed-0e36-4731-ae99-a84f42b8eba1', 'x11sx1fxd8x00x00x00x00': "('maildir/allen-p/inbox/1.',)", 'ULxa1xfcx00x00x00x00x00x00': '{}'}}
 
enter code here

推荐答案

提供你正在使用的 celery 和 librabbitmq 的版本会很有帮助.由于我有一个非常相似的问题,我猜你使用的是 celery 4.0.2 和 librabbitmq 1.6.1.

It would be helpful to give the versions of celery and librabbitmq you are using. Since I had a very similar problem, I'll guess that you are using celery 4.0.2 and librabbitmq 1.6.1.

在这种情况下,这是一个已知的兼容性问题,您可以参考 https://github.com/celery/celery/issues/3675https://github.com/芹菜/librabbitmq/issues/93.

In such case, this is a known compatibility issue, you can refer to https://github.com/celery/celery/issues/3675 and https://github.com/celery/librabbitmq/issues/93.

第一个链接为您提供解决问题的建议:

The first link gives you recommendation to solve your problem namely:

  • 卸载 librabbitmq<代码>pip 卸载 librabbitmq(你可能要多次调用这个命令)

  • uninstall librabbitmq pip uninstall librabbitmq (you may have to call this command many times)

在你的 borker url 中将出现的 amqp 更改为 pyamqp.(但如果您使用的是配置文件,则不在您的配置文件中.这样做对我不起作用).

change the occurrences of amqp to pyamqp in your borker urls. (Though not in your config file if your are using one. Doing that did not work for me).

为了更准确地回答您的其他问题:您说得对,有一个发送者和一个提取者.

To answer more precisely your other questions: you are right saying that there is a sender and a fetcher.

发送者角色由调用 Celery(...) 时创建的应用承担.它的作用之一是充当任务的注册表,如果你在 app/base.py 中查看它的实现,你会发现它实现了一个方法 send_task,该方法由Task 类的 apply_async 方法.此方法的作用是通过线路将您的任务的编组版本发送到代理,以便工作人员可以获取它.用于传输消息的应用程序协议是 amqp,其实现是 librabbitmq.

The sender role is assumed by the app created when you call Celery(...). One of its role is to act as a registry of tasks, and if you look at its implementation in app/base.py, you'll see that it implements a method send_task which is directly called by the method apply_async of the Task class. This method's role is to send a marshalled version of your task through the wire up to the broker so it can be fetched by a worker. The application protocol used to transmit the message is amqp, for which an implementation is librabbitmq.

在线路的另一端,有另一个实例,由执行获取工作的工作人员启动.用 celery 的说法,它被称为 Consumer.你可以在 worker/consumer/consumer.py 中找到它的实现.您将看到它实现了一个 create_task_handler ,该 create_task_handler 又定义了 on_task_received 函数,该函数会引发您看到的错误.它是当从工作人员获取新任务并下一个被处理时调用的函数.

On the other side of the wire, there is another instance, launched by the worker which does the fetching work. In celery's parlance, it is called a Consumer. You can find its implementation in worker/consumer/consumer.py. You will see that it implements a create_task_handler which in turns defines the on_task_received functions that raises the error you are seeing. It is the function called when a new task is fetched from the worker and next in line to by processed.

因此建议的解决方案包括更改 amqp 协议的实现,以便在 on_task_received 中不会引发 TypeError (在我看来,这可能是由编码问题).

The solution suggested therefore consists in changing the implementation of the amqp protocol so that a TypeError is not raised in on_task_received (which it seems to me would be caused by an encoding issue).

我希望它能回答您的所有问题,并让您更清楚地了解芹菜的工作原理.最后我应该说,据我所知,Celery 的常规"使用永远不会要求您篡改这些内部结构,并且您可以通过实现自定义任务类和自定义后端来实现 99% 的目标.

I hope it answers all your questions and gives you a clearer view of how celery works. I should end by saying that to my knowledge a "conventional" use of Celery would never require you to tamper with those kind of internals, and that you can achieve 99% of what you may want by implementing custom task classes and custom backends for example.

这篇关于Celery &amp;Rabbitmq:WARNING/MainProcess] 收到并删除未知消息.错误的目的地?!?- GIT 上的实验的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆