特定收件人使用Redis和python使用故障安全消息广播 [英] Fail-safe message broadcasting to be consumed by a specific recipient using redis and python

查看:116
本文介绍了特定收件人使用Redis和python使用故障安全消息广播的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

因此redis 5.0刚引入了一项名为 Streams 的新功能.它们似乎非常适合分发消息以进行进程间通信:

So redis 5.0 freshly introduced a new feature called Streams. They seem to be perfect for distributing messages for inter process communication:

  • 在可靠性方面,它们超越了PUB/SUB事件消息传递的能力:PUB/SUB是一劳永逸的,无法保证收件人会收到消息
  • redis列表有些底层,但仍可以使用.但是,流针对性能以及上述用例进行了优化.

但是,由于此功能是相当新的功能,因此几乎没有任何Python(甚至通用的redis)手册,而且我还没有真正了解如何使流系统适应我的用例.

However, since this feature is quite new, there are barely any Python (or even general redis) manuals out there and I don't really get how to adapt the stream system to my use case.

我想要一个发布程序,该程序将消息推送到流并包含收件人信息(例如recipient: "user1").然后,我将有几个接收过程,所有接收过程都应检查新的流消息并比较它们是否是目标接收者.如果是,则应处理该消息并将其标记为已处理(已确认).

I want to have one publisher program that pushes messages to the stream and contains a recipient information (like recipient: "user1"). Then I will have several receiving processes that all should check for new stream messages and compare if they are the targeted recipient. If they are, they should process the message and mark it as processed (acknowledged).

但是,我并没有真正了解消费者群体,待定状态等等.有人可以给我一个有关我的小伪代码的真实例子吗?

However, I don't really get the idea of consumer groups, pending state and so on. Can anybody give me a real world example for my little pseudo-code?

sender.py

db = Redis(...)
db.the_stream.add({"recipient": "user1", "task": "be a python"})

recipient.py (会有许多实例在运行,每个实例都有唯一的接收者ID)

recipient.py (there will be many instances of them running each with a unique recipient id)

recipient_id = "user1" # you get the idea...
db = Redis(...)
while True:
    message = db.the_stream.blocking_read("$") # "$" somehow means: just receive new messages
    if message.recipient == recipient_id:
        perform_task(message.task)
        message.acknowledge() # let the stream know it was processed
    else:
        pass # well, do nothing here since it's not our message. Another recipient instance should do the job.```

推荐答案

使用您提供的示例和伪代码,让我们想象一下:

With the example and pseudo code you've given, let's imagine that:

  • recipient.user1每分钟收到60条消息
  • perform_task()方法执行需要2秒钟.
  • the recipient.user1 is getting 60 messages a minute
  • and the perform_task() method takes 2 seconds to execute.

这里将发生的事情是显而易见的:新消息传入与被处理之间的等待时间只会随着时间的增长而增加,与实时处理"的距离越来越远.

What will happen here is obvious: the latency between a new message coming in and having it be processed will only grow over time, drifting further and further from "real-time processing".

system throughput = 30 messages/minute

要解决此问题,您可能需要为user1创建使用者组.在这里,您可以有4个不同的python进程并行运行,而所有4个python进程都加入了user1的同一组.现在,当有消息传给user1时,四个工作人员中的一个便会捡起它并perform_task().

To get around this, you might want create a consumer group for user1. Here you could have 4 distinct python processes running in parallel with all 4 joined in the same group for user1. Now when a message comes in for user1 one of the 4 workers will pick it up and perform_task().

system throughput = 120 message/minute

在您的示例中,message.acknowledge()实际上并不存在,因为您的流阅读器是单独的(XREAD命令).

In your example, the message.acknowledge() doesn't actually exist, because your stream reader is alone (XREAD commands).

如果是一个小组,对消息的确认就变得至关重要,这就是redis知道小组成员中的一个确实处理了该消息的方式,因此它可能会继续前进"(可能会忘记该消息是等待确认).使用组时,有一些服务器端逻辑可确保将每条消息一次一次传递给使用者组工作人员之一(XGROUPREAD命令).客户端完成后,它将发出该消息的确认(XACK命令),以便服务器端消费者组缓冲区"可以将其删除并继续.

If it were a group, the acknowledgement of messages becomes essential, that's how redis knows that one of the groups members did in fact handle that message, so it may "move on" (it may forget the fact that that message was pending acknowledgement). When you're using groups, there's a little bit of server side logic in place to ensure that every message is to delivered to one of the consumer groups workers once (XGROUPREAD commands). When the client has finished, it issues an acknowledgement of that message (XACK commands) so that the server side "consumer group buffer" may delete it and move on.

想象一下,如果一名工人死亡并且从未确认过该消息.使用使用者组,您可以警惕这种情况(使用XPENDING命令),并通过例如重试在另一个使用者中处理同一条消息来对它们采取行动.

Imagine if a worker died and never acknowledged the message. With a consumer group, you're able to watch out for this situation (using XPENDING commands) and act upon them by for example retrying to process the same message in another consumer.

当您不使用组时,redis服务器不需要继续",确认"成为100%的客户端/业务逻辑.

When you're not using groups, the redis server doesn't need to "move on", the "acknowledgement" becomes 100% client side/business logic.

这篇关于特定收件人使用Redis和python使用故障安全消息广播的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆