RabbitMQ 交换在一段时间后变得无响应 [英] RabbitMQ exchange becomes unresponsive after some amount of time

查看:181
本文介绍了RabbitMQ 交换在一段时间后变得无响应的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有在 Docker 中运行的 RabbitMQ 服务器和两个连接到服务器并使用标头交换相互发送消息的 python 客户端.消息速率约为 10/s.一段时间后(大部分时间是在交换了 300-500 条消息之后),其中一个交换变得无响应.channel.basic_publish 调用无一例外地通过,但接收者没有收到任何消息.同样在rabbitmq仪表板上,这个交易所没有任何活动.rabbitmq 仪表板截图

I have RabbitMQ server running in Docker and two python clients that connect to the server and send messages to each other using headers exchange. Message rate is about 10/s. After some amount of time (most of the time after 300-500 messages have been exchanged) one of the exchange become unresponsive. channel.basic_publish call passes without any exception but receiver doesn't receive any messages. Also on rabbitmq dashboard there's no any activity on this exchange. rabbitmq dashboard screenshot

代码示例如下:

    import pika
    import threading
    import time
    import sys


    class Test:
        def __init__(
                self,
                p_username,
                p_password,
                p_host,
                p_port,
                p_virtualHost,
                p_outgoingExchange,
                p_incomingExchange
        ):
            self.__outgoingExch = p_outgoingExchange
            self.__incomingExch = p_incomingExchange
            self.__headers = {'topic': 'test'}
            self.__queueName = ''
            self.__channelConsumer = None
            self.__channelProducer = None
            self.__isRun = False

            l_credentials = pika.PlainCredentials(p_username, p_password)
            l_parameters = pika.ConnectionParameters(
                host=p_host,
                port=p_port,
                virtual_host=p_virtualHost,
                credentials=l_credentials,
                socket_timeout=30,
                connection_attempts=5,
            )

            self.__connection = pika.SelectConnection(
                parameters=l_parameters,
                on_open_callback=self.__on_connection_open,
                on_open_error_callback=self.__on_connection_open_error,
                on_close_callback=self.__on_connection_closed
            )

        def __on_connection_open(self, _conn):
            print("Connection opened")
            self.__connection.channel(on_open_callback=self.__on_consume_channel_open)
            self.__connection.channel(on_open_callback=self.__on_produce_channel_open)

        def __on_connection_open_error(self, _conn, _exception):
            print("Failed to open connection")

        def __on_connection_closed(self, _conn, p_exception):
            print("Connection closed: {}".format(p_exception))

        def __on_consume_channel_open(self, p_ch):
            print("Consumer channel opened")
            self.__channelConsumer = p_ch
            self.__channelConsumer.exchange_declare(
                exchange=self.__incomingExch,
                exchange_type="headers",
                callback=self.__on_consume_exchange_declared
            )

        def __on_consume_exchange_declared(self, p_method):
            print("Consumer exchange declared")
            self.__channelConsumer.queue_declare(
                queue='',
                callback=self.__on_queue_declare
            )

        def __on_queue_declare(self, p_method):
            print("Consumer queue declared")
            self.__queueName = p_method.method.queue
            self.__channelConsumer.queue_bind(
                queue=self.__queueName,
                exchange=self.__incomingExch,
                arguments=self.__headers,
            )
            self.__channelConsumer.basic_consume(self.__queueName, self.__onMessageReceived)

        def __on_produce_channel_open(self, p_ch):
            print("Producer channel opened")
            self.__channelProducer = p_ch
            self.__channelProducer.exchange_declare(
                exchange=self.__outgoingExch,
                exchange_type="headers",
                callback=self.__on_produce_exchange_declared
            )

        def __on_produce_exchange_declared(self, p_method):
            print("Producer exchange declared")
            l_publisher = threading.Thread(target=self.__publishProcedure)
            l_publisher.start()

        def __onMessageReceived(self, p_channel, p_method, p_properties, p_body):
            p_channel.basic_ack(p_method.delivery_tag)
            print("Message received: {}".format(p_body))

        def __publishProcedure(self):
            print("Start publishing")
            l_msgCounter = 0
            while self.__isRun:
                l_msgCounter += 1
                self.__publish(l_msgCounter)
                time.sleep(0.1)

        def __publish(self, p_msgCounter):
            self.__channelProducer.basic_publish(
                exchange=self.__outgoingExch,
                routing_key="#",
                body=str(p_msgCounter),
                properties=pika.BasicProperties(headers=self.__headers)
            )

        def run(self):
            self.__isRun = True
            try:
                self.__connection.ioloop.start()
            except KeyboardInterrupt:
                self.__isRun = False
                self.__connection.close()
                print("Exit...")

    if __name__ == '__main__':
        if len(sys.argv) < 2:
            print("Provide node name [node1 | node2]")
            exit(-1)

        l_outgoingExch = ''
        l_incomingExch = ''
        if sys.argv[1] == 'node1':
            l_outgoingExch = 'node2.headers'
            l_incomingExch = 'node1.headers'
        elif sys.argv[1] == 'node2':
            l_outgoingExch = 'node1.headers'
            l_incomingExch = 'node2.headers'
        else:
            print("Wrong node name")
            exit(-1)
        l_testInstance = Test(
            p_username='admin',
            p_password='admin',
            p_host='localhost',
            p_port=5672,
            p_virtualHost='/',
            p_incomingExchange=l_incomingExch,
            p_outgoingExchange=l_outgoingExch
        )
        l_testInstance.run()

我将两个实例作为两个节点(node1 和 node2)运行,因此它们应该相互通信.

I run two instances as two nodes (node1 and node2) so they should communicate with each other.

有时我也会遇到这里描述的问题:流连接丢失:AssertionError(('_AsyncTransportBase._produce() tx 缓冲区大小下溢', -275, 1),)

Also sometimes I have the issue described here: Stream connection lost: AssertionError(('_AsyncTransportBase._produce() tx buffer size underflow', -275, 1),)

推荐答案

我发现我误用了鼠兔.如鼠兔 文档 指出,跨多个线程共享连接是不安全的.您可以与来自其他线程的连接进行交互的唯一方法是使用 add_callback_threadsafe 函数.在我的例子中,它应该是这样的:

I found that I misused pika. As pika documentation states, it's not safe to share connection across multiple threads. The only way you can interact with connection from other threads is to use add_callback_threadsafe function. In my example it should look like this:

   def __publishProcedure(self):
        print("Start publishing")
        l_msgCounter = 0
        while self.__isRun:
            l_msgCounter += 1
            l_cb = functools.partial(self.__publish, l_msgCounter)
            self.__connection.ioloop.add_callback_threadsafe(l_cb)
            time.sleep(0.1)

    def __publish(self, p_msgCounter):
        self.__channelProducer.basic_publish(
            exchange=self.__outgoingExch,
            routing_key="#",
            body=str(p_msgCounter),
            properties=pika.BasicProperties(headers=self.__headers)
        )

这篇关于RabbitMQ 交换在一段时间后变得无响应的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆