Celery& Rabbitmq:警告/ MainProcess]已接收和删除的未知消息。目的地不正确?!?-GIT实验 [英] Celery &Rabbitmq:WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?- a experiment on the GIT

查看:54
本文介绍了Celery& Rabbitmq:警告/ MainProcess]已接收和删除的未知消息。目的地不正确?!?-GIT实验的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

最近,我正在一个GIT项目上进行实验,以了解大数据处理框架。

Recently , I am doing an experiment on a GIT project to understanding the big data processing framework.

1,GIT项目:https://github.com/esperdyne/celery-消息处理

1、GIT project:https://github.com/esperdyne/celery-message-processing

我们具有以下组件:

1,AMPQ代理( RabbitMQ ):消息缓冲区,它充当邮箱为不同用户交换消息!

1、AMPQ broker(RabbitMQ): it works as a message buffer, which works as a mail-box to exchange messages for different user!

2,工作者:它充当服务服务器,为各种服务客户端提供服务。
3,队列(芹菜 :它是一个多处理容器,用于同时处理各种工作程序实例。

2、worker: it works as the service-server to provide service for various service client. 3、Queue("celery":it works as a multi-processing container which is used to handle the various worker instances at the same time.

关键配置可以看如下:

我们使用对象proj / celery.py定义应用程序,定义如下:

We use the object proj/celery.py to define the app, the definition can be seen as below:

app = Celery('proj',
         broker='amqp://',
         backend='redis://localhost',
         include=['proj.tasks'])

在此处输入代码

启动应用程序时:

1,启动应用程序时,我们已经看到了Rabbitmq生成的消息,但是芹菜无法处理该消息。

1、 when we start the application, we have seen the message which is produced from the rabbitmq, yet the celery could not handle the message.

Parse.log看起来像这样:[2017-02-04 14:28:06,909:WARNING / MainProcess]收到并删除了未知消息。错误的目的地?!?

Parse.log looks like this:[2017-02-04 14:28:06,909: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?

我们有以下问题:

4.2.1 AMQP机制
我们可以看到AMQP充当消息缓冲区,然后将成为消息发送者和消息提取者:

4.2.1 AMQP mechanism We can see that the AMQP works as the message buffer, then there will be a message sender and a message fetcher:

在上图中,谁是消息发送者,谁是消息提取者。

In the above diagram , who is the message sender and who is the message fetcher.

4.2.2消息定义
在我们的应用程序中,我们找不到用于定义要从AMQP发送或接收消息的代码。

4.2.2 Message definition In our application , we can not find the code to define the Message to send ,or to receive form the AMQP.

4.2.3消息监视器
我们如何在以下位置监视消息的发送和接收AMQP。
希望老师会指导我们解决问题,并给我们一些详尽的介绍

4.2.3 Message monitor How can we monitor the Message send and receive in the AMQP. Hope a teacher will guide us to solve the problem , and give us some detailed

关于芹菜经纪人机制的介绍!

introduction on the celery broker mechenism!

注意:错误日志可在此处查看

note : the error log can be seen here

[2017-02-04 14:28:06,909: WARNING/MainProcess] Received and deleted unknown message.  Wrong destination?!?



 The full contents of the message body was: body: [[u'maildir/allen-       p/inbox/1.'], {}, {u'errbacks': None, u'callbacks': None, u'chord': None, u'chain': [{u'chord_size': None, u'task': u'celery.group', u'args': [], u'immutable': False, u'subtask_type': u'group', u'kwargs': {u'tasks': [{u'chord_size': None, u'task': u'proj.tasks.deploy_db', u'args': [], u'options': {u'reply_to': u'3d9de118-f9d0-3bee-9972-b6a4d4482446', u'task_id': u'3cafda16-3e7c-44db-b05e-1327ef97ffc3'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}, {u'chord_size': None, u'task': u'proj.tasks.deploy_es', u'args': [], u'options': {u'reply_to': u'3d9de118-f9d0-3bee-9972-b6a4d4482446', u'task_id': u'1f4c728b-680d-4dde-98b9-b153d5282780'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}]}, u'options': {u'parent_id': None, u'task_id': u'f21c911e-f2ac-462e-9662-2efbd27bcf91', u'root_id': None}}]}] (801b)
{content_type:'application/json' content_encoding:'utf-8'
  delivery_info:{'consumer_tag': 'None4', 'redelivered': False, 'routing_key': 'parse', 'delivery_tag': 623422L, 'exchange': ''} headers={'\xe5\xca.\xdb\x00\x00\x00\x00\x00': None, 'P&5\x07\x00': None, 'T\nKB\x00\x00\x00': 'fc8f0bed-665f-4699-89dd-a56fc247ea8b', 'N\xfd\x17=\x00\x00': 'gen17347@centos1', '\xcfb\xddR': 'py', '9*\xa8': None, '\xb7/b\x84\x00\x00\x00': 0, '\xe0\x0b\xfa\x89\x00\x00\x00': None, '\xdfR\xc4x\x00\x00\x00\x00\x00': [None, None], 'T3\x1d ': 'proj.tasks.parse', '\xae\xbf': 'fc8f0bed-665f-4699-89dd-a56fc247ea8b', '\x11s\x1f\xd8\x00\x00\x00\x00': "('maildir/allen-p/inbox/1.',)", 'UL\xa1\xfc\x00\x00\x00\x00\x00\x00': '{}'}}


[2017-02-04 15:47:22,463: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672//
[2017-02-04 15:47:22,473: INFO/MainProcess] mingle: searching for neighbors
[2017-02-04 15:47:23,503: INFO/MainProcess] mingle: sync with 2 nodes
[2017-02-04 15:47:23,504: INFO/MainProcess] mingle: sync complete
[2017-02-04 15:47:23,530: INFO/MainProcess] parse@centos1 ready.
[2017-02-04 15:47:24,890: INFO/MainProcess] sync with es_deploy@centos1
[2017-02-04 15:47:51,017: WARNING/MainProcess] Received and deleted unknown message.  Wrong destination?!?

The full contents of the message body was: body: [[u'maildir/allen-p/inbox/1.'], {}, {u'errbacks': None, u'callbacks': None, u'chord': None, u'chain': [{u'chord_size': None, u'task': u'celery.group', u'args': [], u'immutable': False, u'subtask_type': u'group', u'kwargs': {u'tasks': [{u'chord_size': None, u'task': u'proj.tasks.deploy_db', u'args': [], u'options': {u'reply_to': u'bd66dd5c-516d-3b51-ab40-c8337a33b18e', u'task_id': u'765e5bbe-198f-405c-b10c-023d35e03981'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}, {u'chord_size': None, u'task': u'proj.tasks.deploy_es', u'args': [], u'options': {u'reply_to': u'bd66dd5c-516d-3b51-ab40-c8337a33b18e', u'task_id': u'7dacb897-d023-40b5-9874-e00b75107bbd'}, u'subtask_type': None, u'kwargs': {}, u'immutable': False}]}, u'options': {u'parent_id': None, u'task_id': u'f0d41289-33e2-4c8c-8d84-9d1d4c5a9c80', u'root_id': None}}]}] (801b)
{content_type:'application/json' content_encoding:'utf-8'
  delivery_info:{'consumer_tag': 'None4', 'redelivered': False, 'routing_key': 'parse', 'delivery_tag': 3L, 'exchange': ''} headers={'\xe5\xca.\xdb\x00\x00\x00\x00\x00': None, 'P&5\x07\x00': None, 'T\nKB\x00\x00\x00': '4d7754ed-0e36-4731-ae99-a84f42b8eba1', 'N\xfd\x17=\x00\x00': 'gen19722@centos1', '\xcfb\xddR': 'py', '9*\xa8': None, '\xb7/b\x84\x00\x00\x00': 0, '\xe0\x0b\xfa\x89\x00\x00\x00': None, '\xdfR\xc4x\x00\x00\x00\x00\x00': [None, None], 'T3\x1d ': 'proj.tasks.parse', '\xae\xbf': '4d7754ed-0e36-4731-ae99-a84f42b8eba1', '\x11s\x1f\xd8\x00\x00\x00\x00': "('maildir/allen-p/inbox/1.',)", 'UL\xa1\xfc\x00\x00\x00\x00\x00\x00': '{}'}}
 
enter code here


推荐答案

有助于提供您正在使用的celery和librabbitmq的版本。由于我遇到了非常相似的问题,因此我猜您正在使用celery 4.0.2和librabbitmq 1.6.1。

It would be helpful to give the versions of celery and librabbitmq you are using. Since I had a very similar problem, I'll guess that you are using celery 4.0.2 and librabbitmq 1.6.1.

在这种情况下,这是一个已知的兼容性问题,您可以参考 https://github.com/celery/celery/issues/3675 ​​ https://github.com/celery/librabbitmq/issues/93

In such case, this is a known compatibility issue, you can refer to https://github.com/celery/celery/issues/3675 and https://github.com/celery/librabbitmq/issues/93.

第一个链接为您提供解决问题的建议:

The first link gives you recommendation to solve your problem namely:


  • 卸载librabbitmq

    pip卸载librabbitmq
    (您可能必须多次调用此命令)

  • uninstall librabbitmq pip uninstall librabbitmq (you may have to call this command many times)

在您的博克URL中将 amqp 的出现更改为 pyamqp 。 (如果您使用的是配置文件,则不在配置文件中。这样做对我不起作用。)

change the occurrences of amqp to pyamqp in your borker urls. (Though not in your config file if your are using one. Doing that did not work for me).

To更准确地回答您的其他问题:您说对了,有发送者和提取者。

To answer more precisely your other questions: you are right saying that there is a sender and a fetcher.

发件人角色由调用 Celery(...)时创建的应用程序承担。它的作用之一是充当任务注册表,如果您在app / base.py中查看其实现,则会看到它实现了方法 send_task 由Task类的方法 apply_async 直接调用。该方法的作用是通过电汇将任务的编组版本发送给代理,以便可以由工作人员获取。用于传输消息的应用协议是amqp,其实现是librabbitmq。

The sender role is assumed by the app created when you call Celery(...). One of its role is to act as a registry of tasks, and if you look at its implementation in app/base.py, you'll see that it implements a method send_task which is directly called by the method apply_async of the Task class. This method's role is to send a marshalled version of your task through the wire up to the broker so it can be fetched by a worker. The application protocol used to transmit the message is amqp, for which an implementation is librabbitmq.

在电汇的另一端,还有另一个实例,它是由工作人员启动的,它负责提取工作。用芹菜的话来说,它叫做消费者。您可以在worker / consumer / consumer.py中找到其实现。您将看到它实现了 create_task_handler ,它依次定义了 on_task_received 函数,该函数会引起您所看到的错误。当从工作程序中提取新任务,然后下一行被处理时,调用此函数。

On the other side of the wire, there is another instance, launched by the worker which does the fetching work. In celery's parlance, it is called a Consumer. You can find its implementation in worker/consumer/consumer.py. You will see that it implements a create_task_handler which in turns defines the on_task_received functions that raises the error you are seeing. It is the function called when a new task is fetched from the worker and next in line to by processed.

因此,建议的解决方案是更改amqp协议的实现,以使 TypeError 不会在 on_task_received (在我看来,这是由编码问题引起的)。

The solution suggested therefore consists in changing the implementation of the amqp protocol so that a TypeError is not raised in on_task_received (which it seems to me would be caused by an encoding issue).

我希望它能回答您的所有问题并让您更清楚地了解芹菜的工作原理。最后,我要说的是,据我所知,常规使用Celery永远不需要您篡改这种内部结构,例如,通过实现自定义任务类和自定义后端,您可以实现99%的期望值

I hope it answers all your questions and gives you a clearer view of how celery works. I should end by saying that to my knowledge a "conventional" use of Celery would never require you to tamper with those kind of internals, and that you can achieve 99% of what you may want by implementing custom task classes and custom backends for example.

这篇关于Celery& Rabbitmq:警告/ MainProcess]已接收和删除的未知消息。目的地不正确?!?-GIT实验的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆