Reactor 中`groupBy` 组的并行调度 [英] Parallel dispatch of `groupBy` groups in Reactor

查看:60
本文介绍了Reactor 中`groupBy` 组的并行调度的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在学习 Reactor,我想知道如何实现某种行为.假设我有一个传入消息流.每条消息都与某个实体相关联并包含一些数据.

I'm learning Reactor, and I'm wondering how to achieve a certain behavior. Let's say I have a stream of incoming messages. Each message is associated with a certain entity and contains some data.

interface Message {
    String getEntityId();
    Data getData();
}

与不同实体相关的消息可以并行处理.但是,与任何单个实体有关的消息必须一次处理一个,即实体 "abc" 的消息 2 的处理不能开始,直到实体 "abc"<的消息 1 的处理才能开始/code> 已完成.在处理消息时,应该缓冲该实体的进一步消息.其他实体的消息可以畅通无阻地进行.可以将其视为每个实体在线程上运行如下代码:

Messages relating to different entities can be processed in parallel. However, messages pertaining to any single entity must be processed one at a time, i.e. processing of message 2 for entity "abc" can not start until processing of message 1 for entity "abc" has finished. While processing of a message is underway, further messages for that entiy should be buffered. Message for other entities can proceed unimpeded. One can think of it as there being on thread per entity running code like this:

public void run() {
    for (;;) {
        // Blocks until there's a message available
        Message msg = messageQueue.nextMessageFor(this.entityId);

        // Blocks until processing is finished
        processMessage(msg);
    }
}

如何在不阻塞的情况下使用 React 实现这一点?总消息速率可能很高,但每个实体的消息速率将非常低.实体集可能非常大,不一定事先知道.

How can I achieve this with React without blocking? The total message rate may be high, but message rate per entity will be very low. The set of entities can be very large, and is not necessarily known in advance.

我想它可能看起来像这样,但我不知道.

I guess it might look something like this, but I don't know.

{
    incomingMessages()
            .groupBy(Message::getEntityId)
            .flatMap(entityStream -> entityStream
                    /* ... */
                    .map(msg -> /* process the message */)))
                    /* ... */
}

public static Stream<Message> incomingMessages() { /* ... */ }

推荐答案

有了 ProjectReactor 你可以这样解决:

With ProjectReactor you can solve it in this way:

@Test
public void testMessages() {
    Flux.fromStream(incomingMessages())
            .groupBy(Message::getEntityId)
            .map(g -> g.publishOn(Schedulers.newParallel("groupByPool", 16))) //create new publisher for groups of messages
            .subscribe( //create consumer for main stream
                    stream ->
                            stream.subscribe(this::processMessage) // create consumer for group stream
            );
}

public Stream<Message> incomingMessages() {
    return IntStream.range(0, 100).mapToObj(i -> new Message(i, i % 10));
}

public void processMessage(Message message) {
    System.out.println(String.format("Message: %s processed by the thread: %s", message, Thread.currentThread().getName()));
}

private static class Message {
    private final int id;
    private final int entityId;

    public Message(int id, int entityId) {
        this.id = id;
        this.entityId = entityId;
    }

    public int getId() {
        return id;
    }

    public int getEntityId() {
        return entityId;
    }

    @Override
    public String toString() {
        return "Message{" +
                "id=" + id +
                ", entityId=" + entityId +
                '}';
    }
}

我认为类似的解决方案可能在 RxJava

I think the similar solution could be in RxJava

这篇关于Reactor 中`groupBy` 组的并行调度的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆