如何在python中做一个简单的Pika SelectConnection来发送消息? [英] How to do a simple Pika SelectConnection to send a message, in python?

查看:87
本文介绍了如何在python中做一个简单的Pika SelectConnection来发送消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将我的代码转换为通过 Pika 发送 rabbitmq 消息.我在理解如何使用异步连接(例如 SelectConnection)发送简单消息时遇到了很多麻烦.

I am trying to convert my code to send rabbitmq messages via Pika instead. I am having a lot of trouble understanding how to send a simple message using an asynchronous connection (such as SelectConnection).

在我使用 amqp 库的旧代码中,我只是声明了一个这样的类:

In my old code, which I use the amqp library I simply declare a class like this:

import amqp as amqp

class MQ():

    mqConn = None
    channel = None

    def __init__(self):
        self.connect()

    def connect(self):
        if self.mqConn is None:
            self.mqConn = amqp.Connection(host="localhost", userid="dev", password="dev", virtual_host="/", insist=False)
            self.channel = self.mqConn.channel()

        elif not self.mqConn.connected:
            self.mqConn = amqp.Connection(host="localhost", userid="dev", password="dev", virtual_host="/", insist=False)
            self.channel = self.mqConn.channel()

    def sendMQ(self, message):
        self.connect()
        lMessage = amqp.Message(message)
        self.channel.basic_publish(lMessage, exchange="DevMatrixE", routing_key="dev_matrix_q") 

然后在我的代码中的其他地方调用 sendMQ("this is my message"),然后代码继续.我不需要听确认等.

And then elsewhere in my code I call sendMQ("this is my message"), and then the code continues. I do not need to listen for acknowledgements etc.

有人可以使用 pika 和 SelectConnection 编写一个简单的类,它也可以使用 sendMQ("这是我的消息") 发送消息吗?我看过 pika 的例子,但我不知道如何绕过 ioloop 和 KeyboardInterrupt.我想我只是不确定如何让我的代码在没有所有这些 try/excepts 的情况下继续运行......此外,我不确定如何通过所有回调传递我的消息......

Could someone please write a simple class utilizing pika and SelectConnection that would also work to just send a message using sendMQ("this is my message")? I've looked at the pika examples but I don't know how to get around the ioloop and KeyboardInterrupt. I guess I'm just not sure how to make my code continue to run without all these try/excepts... Also, not exactly sure how I can pass my message on through all the callbacks...

感谢任何帮助!

谢谢.

推荐答案

整个事情都是回调驱动的,因为它是一种异步的做事方式.异步消费者很容易理解,我们可以通过提供回调函数来获取消息.然而,至少对于初学者来说,发布者部分有点难以理解.

The whole thing is call back driven, as it is a async way of doing things. Async consumer is easy to understand, we can get the message by providing a call back function. However the publisher part is a bit difficult to understand, at least, for beginner.

通常我们需要一个队列来进行通信,并且发布者定期从中获取数据.

Usually we need a Queue to do the communication, and the publisher get data from it periodically.

使用 SelectConnection 的关键是将您的发布消息函数注册到事件循环中,这可以通过 connection.add_timeout 来完成.完成发布后,注册下一轮发布.

The key thing of using SelectConnection is to register your publish message function into the event loop, which can be done by connection.add_timeout. After you are done with the publish, register next round of your publish.

下一个问题是在哪里放置初始注册.初始注册可以在频道开放回调中进行.

The next question is where to put the the initial registration. The initial registration can be done in the channel open call back.

下面是一个代码片段,以便更好地理解.请注意,尚未准备好生产.因为它只以每秒 10 条的最大速度发布消息.需要调整发布间隔,一次回调发布更多消息.

Below is a code-snip for better understanding. Be aware, it is not production ready. Because it only publish message at max speed of 10 per second. You need to adjust the publish interval and publish more message at one call back.

class MQ(Object):
    def __init___(self, queue):
        self.queue = queue
    def on_channel_open(self, chn):
        self.channel = chn
        self.connection.add_timeout(0.1, self.schedule_next_message)
    def schedule_next_message(self):
        try:
            msg = self.queue.get(True, 0.01)
            self.channel.basic_publish('YOUR EXCHANGE','YOUR ROUTING KEY',msg)
        catch Queue.Empty:
            pass
        self.connection.add_timeout(0.1, self.schedule_next_message)
    def on_open(self, conn):
        self.connection = conn
        self.connection.channel(on_open_callback=self.on_channel_open)
    def run(self):
        # create a connection
        self.connection = pika.SelectConnection(pika.ConnectionParameters(heartbeat=600,host=args.mq_ip),self.on_open)
        try:
            self.connection.ioloop.start()
        except Exception:
            print "exception in publisher"
            self.connection.close()
            self.connection.ioloop.start()

将 MQ(queue).run() 放在一个单独的线程中,任何时候你想把消息放到 mq 中,只要把它放到队列对象中即可.

Put MQ(queue).run() in a separate thread, and whenever you want to put message to mq, just put it into the queue object.

这篇关于如何在python中做一个简单的Pika SelectConnection来发送消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆