如何让 zeromq PUB/SUB 丢弃旧消息而不是新消息(对于实时提要)? [英] Howto make zeromq PUB/SUB drop old messages instead of new (for realtime feeds)?

查看:31
本文介绍了如何让 zeromq PUB/SUB 丢弃旧消息而不是新消息(对于实时提要)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有一个PUB服务器,它zmq_send()<发送实时消息code>SUB 客户端.如果客户端很忙并且不能zmq_recv() 足够快的消息,那么消息将在客户端(和/或服务器)中缓冲.

Say I have a PUB server that zmq_send()'s realtime messages to SUB client. If client is busy and can not zmq_recv() messages quick enough, then messages will be buffered in client (and/or server).

如果缓冲区变得太大(高水位线),那么新消息将被丢弃.对于实时消息,这与人们想要的相反.应删除旧消息,为新消息腾出位置.

If the buffer grows too large (high water mark) then NEW messages will be dropped. For realtime messages this is the opposite of what one wants. OLD messages should be dropped to make place for NEW ones.

有没有办法做到这一点?

Is there some way to do this?

理想情况下,我希望 SUB 客户端的接收队列为空或仅包含最新消息.当收到新消息时,它将替换旧消息.(我猜这里的问题是当队列为空时,客户端会在 zmq_recv() 上阻塞,这样做是浪费时间.)

Ideally I would like the SUB client's receive queue to be either empty or contain the most recent message only. When a new message is received it would replace the old one. ( I guess the problem here would be that the client would block on zmq_recv() when the queue is empty, wasting time doing so. )

那么在 ZeroMQ 中通常如何实现实时提要?

So how are realtime feeds usually implemented in ZeroMQ?

推荐答案

我会在这里回答我自己的问题.设置 ZMQ_CONFLATE "Keep only last message" 看起来很有希望,但它不适用于订阅过滤器.它只在队列中保留一条消息.如果您有多个过滤器,则另一种过滤器类型的新旧消息都会被丢弃.

I'll answer my own question here. The setting ZMQ_CONFLATE "Keep only last message" seemed promising but it doesn't work with subscription filters. It only ever keeps one message in the queue. If you have more than one filter, both old and new messages of the other filters type gets thrown away.

同样,zeromq 指南的建议是简单地杀死慢速订阅者,但这似乎不是现实的解决方案.让具有不同读取速度的订阅者订阅同一个快速发布者,应该是一个正常的用例.其中一些订阅者可能生活在速度较慢的计算机上,而另一些则生活在快速的计算机上,等等.ZeroMQ 应该能够以某种方式处理这种情况.

Likewise the recommendation of the zeromq guide to simply to kill slow subscribers, but that doesn't seem like realistic solution. Having subscribers with different read speeds, subscribed to the same fast publisher, should be a normal use case. Some of these subscribers might live on slow computers others on fast ones, etc. ZeroMQ should be able to handle that somehow.

http://zguide.zeromq.org/page:all#Slow-Subscriber-Detection-Suicide-Snail-Pattern

我最终在客户端手动删除了旧的排队消息.它似乎工作正常.我以这种方式向客户端订阅了不到 3 毫秒的消息(通过 tcp localhost).即使在我有 5000 条 10 秒旧消息的情况下,这也有效,在队列中的那些少数实时消息在后面.这对我来说已经足够了.

I ended up doing manual dropping of old queued up messages on the client side. It seems to work fine. I get subscribed messages to the client that are less than 3ms old (through tcp localhost) that way. This works even in cases where I have five thousand, 10 second old messages, in the queue in front of those few real-time message at the back. This is good enough for me.

我不禁认为这是图书馆应该提供的东西.它可能会做得更好.

I cant help but think this is something that should be provided by the library. It could probably do a better job of it.

无论如何这里是客户端,旧消息丢弃,代码:

Anyways here is the client side, old message dropping, code:

bool Empty(zmq::socket_t& socket) {
    bool ret = true;
    zmq::pollitem_t poll_item = { socket, 0, ZMQ_POLLIN, 0 };
    zmq::poll(&poll_item, 1, 0); //0 = no wait
    if (poll_item.revents & ZMQ_POLLIN) {
        ret = false;
    }
    return ret;
}

std::vector<std::string> GetRealtimeSubscribedMessageVec(zmq::socket_t& socket_sub, int timeout_ms)
{
    std::vector<std::string> ret;

    struct MessageTmp {
        int id_ = 0;
        std::string data_;
        boost::posix_time::ptime timestamp_;
    };

    std::map<int, MessageTmp> msg_map;

    int read_msg_count = 0;
    int time_in_loop = 0;
    auto start_of_loop = boost::posix_time::microsec_clock::universal_time();
    do {
        read_msg_count++;

        //msg format sent by publisher is: filter, timestamp, data
        MessageTmp msg;
        msg.id_ = boost::lexical_cast<int>(s_recv(socket_sub));
        msg.timestamp_ = boost::posix_time::time_from_string(s_recv(socket_sub));
        msg.data_ = s_recv(socket_sub);

        msg_map[msg.id_] = msg;

        auto now = boost::posix_time::microsec_clock::universal_time();
        time_in_loop = (now - start_of_loop).total_milliseconds();
        if (time_in_loop > timeout_ms) {
            std::cerr << "Timeout reached. Publisher is probably sending messages quicker than we can drop them." << std::endl;
            break;
        }
    } while ((Empty(socket_sub) == false)); 

    if (read_msg_count > 1) {
        std::cout << "num of old queued up messages dropped: " << (read_msg_count - 1) << std::endl;
    }

    for (const auto &pair: msg_map) {
        const auto& msg_tmp = pair.second;

        auto now = boost::posix_time::microsec_clock::universal_time();
        auto message_age_ms = (now - msg_tmp.timestamp_).total_milliseconds();

        if (message_age_ms > timeout_ms) {
            std::cerr << "[SUB] Newest message too old. f:" << msg_tmp.id_ << ", age: " << message_age_ms << "ms, s:" << msg_tmp.data_.size() << std::endl;
        }
        else {
            std::cout << "[SUB] f:" << msg_tmp.id_ << ", age: " << message_age_ms << "ms, s:" << msg_tmp.data_.size() << std::endl;
            ret.push_back(msg_tmp.data_);
        }
    }

    return ret;
}

这篇关于如何让 zeromq PUB/SUB 丢弃旧消息而不是新消息(对于实时提要)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆