Kombu/Celery消息传递 [英] Kombu/Celery messaging

查看:120
本文介绍了Kombu/Celery消息传递的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个发送&接收消息,kombu,并使用Celery对消息进行任务处理.Kombu alon,我可以正常收到消息.当我发送"Hello"时,kombu会收到"Hello".但是,当我添加任务时,kombu收到的是芹菜的任务ID.

I have a simple application that sends & receives messages, kombu, and uses Celery to task the message. Kombu alon, I can receive the message properly. when I send "Hello", kombu receives "Hello". But when I added the task, what kombu receives is the task ID of the celery.

我在此项目中的目的是让我可以安排发送和接收消息的时间,因此也可以安排Celery.

My purpose for this project is so that I can schedule when to send and receive messages, hence Celery.

我想知道的是为什么kombu接收任务ID而不是发送的消息?我已经搜索了并且没有找到关于此事的任何相关结果.我是使用此应用程序的初学者,希望能帮助您解决此问题.

What I would like to know is why is kombu receiving the task id instead of the sent message? I have searched and searched and have not found any related results on this matter. I am a beginner in using this applications and I would appreciate some help in fixing this matter.

我的代码:

task.py

from celery import Celery

app = Celery('tasks', broker='amqp://xx:xx@localhost/xx', backend='amqp://')

@app.task(name='task.add')
def add(x, y):
    return x+y

send.py

import kombu
from task import add
#declare connection with broker connection
connection = kombu.Connection(hostname='xx',
                              userid='xx',
                              password='xx',
                              virtual_host='xx')

connection.connect()
if connection.connect() is False:
    print("not connected")
else:
    print("connected")

#checks if connection is okay


#rabbitmq connection
channel = connection.channel()

#queue & exchange for kombu
exchange = kombu.Exchange('exchnge', type='direct')
queue = kombu.Queue('kombu_queue', exchange, routing_key='queue1')

#message here

x = input ("Enter first name: ")
y = input ("Enter last name: ")
result= add.delay(x,y)
print(result)



#syntax used for sending messages to queue
producer = kombu.Producer(channel, exchange)
producer.publish(result,
                 exchange = exchange,
                 routing_key='queue1')

print("Message sent: [x]")
connection.release()

receive.py

receive.py

import kombu

#receive
connection = kombu.Connection(hostname='xx',
                              userid='xx',
                              password='xx',
                              virtual_host='xx')
connection.connect()

channel = connection.channel()

exchange = kombu.Exchange('exchnge', type='direct')
queue = kombu.Queue('kombu_queue', exchange, routing_key='queue1')

print("Waiting for messages...")
def callback(body, message):
    print('Got message - %s' % body)
    message.ack()

consumer = kombu.Consumer(channel,
                          queues=queue,
                          callbacks=[callback])
consumer.consume()

while True:
    connection.drain_events()

我正在使用:

Kombu 3.0.26
Celery 3.1.18
RabbitMQ as the broker

我发送的内容:

xxx
yyy

海带收到的信息:

Got message - d22880c9-b22c-48d8-bc96-5d839b224f2a

推荐答案

我找到了解决我问题的答案,对于可能遇到此类问题的任何人,我将分享对我有用的答案.

我在这里找到了解决方案.

或在这里-用户 jennaliu 的回答可能如果第一个链接不起作用,可以为您提供帮助.

Or here - user jennaliu answer may probably help you if the first link didn't work.

这篇关于Kombu/Celery消息传递的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆