RabbitMQ管道破裂错误或消息丢失 [英] RabbitMQ broken pipe error or lost messages

查看:1524
本文介绍了RabbitMQ管道破裂错误或消息丢失的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用pika库的BlockingConnection连接到RabbitMQ,发布消息时偶尔会出现错误:

致命套接字错误:error(32,'Broken pipe')

这来自一个非常简单的子过程,该子过程将一些信息从内存队列中取出,并将一条小的JSON消息发送到AMQP中.仅当系统在几分钟内未发送任何消息时,才会出现该错误.

设置:

connection = pika.BlockingConnection(parameters)
channel = self.connection.channel()
channel.exchange_declare(
    exchange='xyz',
    exchange_type='fanout',
    passive=False,
    durable=True,
    auto_delete=False
)

队列代码捕获任何连接错误并重试:

def _enqueue(self, message_id, data):
    try:
        published = self.channel.basic_publish(
            self.amqp_exchange,
            self.amqp_routing_key,
            json.dumps(data),
            pika.BasicProperties(
                content_type="application/json",
                delivery_mode=2,
                message_id=message_id
            )
        )

        # Confirm delivery or retry
        if published:
            self.retry_count = 0
        else:
            raise EnqueueException("Message publish not confirmed.")

    except (EnqueueException, pika.exceptions.AMQPChannelError, pika.exceptions.AMQPConnectionError,
            pika.exceptions.ChannelClosed, pika.exceptions.ConnectionClosed, pika.exceptions.UnexpectedFrameError,
            pika.exceptions.UnroutableError, socket.timeout) as e:
        self.retry_count += 1
        if self.retry_count < 5:
            logging.warning("Reconnecting and resending")
            if self.connection.is_open:
                self.connection.close()
            self.connect()
            self._enqueue(message_id, data)
        else:
            raise e

这有时在第二次尝试时有效.它通常会挂一会儿,或者只是丢弃消息,然后才最终引发异常(可能是相关的错误报告).由于它仅在系统安静几分钟后才会发生,我想这是由于连接超时所致.但是AMQP有一个心跳系统,据报道pika使用了它(相关的错误报告).

为什么我会收到此错误消息或丢失消息,为什么在不使用连接时无法保持打开状态?

解决方案

从另一个错误报告:

由于BlockingConnection无法在后台处理心跳,并且heartbeat_interval无法覆盖服务器建议的心跳间隔(这也是一个错误),因此我建议默认情况下应禁用心跳(而是依靠TCP keep-alive)

如果在消耗块中处理任务需要更长的时间,则服务器建议的心跳间隔,服务器将关闭连接,并且客户端在处理完成后将无法确认消息.

未发布的更新可能有助于解决该问题.

因此,我实施了一种解决方法.我每30秒通过队列发布一次心跳消息.这样可以保持连接打开,并具有向客户端确认我的应用程序已启动并正在运行的附加好处.

Using the pika library's BlockingConnection to connect to RabbitMQ, I occasionally get an error when publishing messages:

Fatal Socket Error: error(32, 'Broken pipe')

This is from a very simple sub-process that takes some information out of an in-memory queue and sends a small JSON message into AMQP. The error only seems to come up when the system hasn't sent any messages for a few minutes.

Setup:

connection = pika.BlockingConnection(parameters)
channel = self.connection.channel()
channel.exchange_declare(
    exchange='xyz',
    exchange_type='fanout',
    passive=False,
    durable=True,
    auto_delete=False
)

Enqueue code catches any connection errors and retries:

def _enqueue(self, message_id, data):
    try:
        published = self.channel.basic_publish(
            self.amqp_exchange,
            self.amqp_routing_key,
            json.dumps(data),
            pika.BasicProperties(
                content_type="application/json",
                delivery_mode=2,
                message_id=message_id
            )
        )

        # Confirm delivery or retry
        if published:
            self.retry_count = 0
        else:
            raise EnqueueException("Message publish not confirmed.")

    except (EnqueueException, pika.exceptions.AMQPChannelError, pika.exceptions.AMQPConnectionError,
            pika.exceptions.ChannelClosed, pika.exceptions.ConnectionClosed, pika.exceptions.UnexpectedFrameError,
            pika.exceptions.UnroutableError, socket.timeout) as e:
        self.retry_count += 1
        if self.retry_count < 5:
            logging.warning("Reconnecting and resending")
            if self.connection.is_open:
                self.connection.close()
            self.connect()
            self._enqueue(message_id, data)
        else:
            raise e

This sometimes works on the second attempt. It often hangs for a while or just throws away messages before eventually throwing an exception (possibly related bug report). Since it only happens when the system is quiet for a few minutes I'm guessing it's due to a connection timeout. But AMQP has a heartbeat system and pika reportedly uses it (related bug report).

Why do I get this error or lose messages, and why won't the connection stay open when not in use?

解决方案

From another bug report:

As BlockingConnection doesn't handle heartbeats in the background and the heartbeat_interval can't override the servers suggested heartbeat interval (that's a bug too), i suggest that heartbeats should be disabled by default (rely on TCP keep-alive instead).

If processing a task in a consume block takes longer time then the server suggested heartbeat interval, the connection will be closed by the server and the client won't be able to ack the message when it's done processing.

An unreleased update may help with the issue.

So I implemented a workaround. Every 30 seconds I publish a heartbeat message through the queue. This keeps the connection open and has the added benefit of confirming to clients that my application is up and running.

这篇关于RabbitMQ管道破裂错误或消息丢失的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆