线程池以并行方式处理消息,但保留对话中的顺序 [英] Thread pool to process messages in parallel, but preserve order within conversations

查看:93
本文介绍了线程池以并行方式处理消息,但保留对话中的顺序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要并行处理消息,但是保留具有相同对话ID的消息的处理顺序.

I need to process messages in parallel, but preserve the processing order of messages with the same conversation ID.

示例:
让我们这样定义一条消息:

Example:
Let's define a Message like this:

class Message {
    Message(long id, long conversationId, String someData) {...}
}

假设消息按以下顺序到达:
Message(1、1,"a1"),Message(2、2,"a2"),Message(3、1,"b1"),Message(4、2,"b2") .

Suppose the messages arrive in the following order:
Message(1, 1, "a1"), Message(2, 2, "a2"), Message(3, 1, "b1"), Message(4, 2, "b2").

我需要在消息1之后处理消息3,因为消息1和3具有相同的会话ID(类似地,出于相同原因,消息4应该在2之后进行处理).
我不在乎例如1和2,因为它们具有不同的会话ID.

I need the message 3 to be processed after the message 1, since messages 1 and 3 have the same conversation ID (similarly, the message 4 should be processed after 2 by the same reason).
I don't care about the relative order between e.g. 1 and 2, since they have different conversation IDs.

我想尽可能地重用java ThreadPoolExecutor 的功能,以避免不得不在代码等中手动替换死线程.

I would like to reuse the java ThreadPoolExecutor's functionality as much as possible to avoid having to replace dead threads manually in my code etc.

更新:可能的对话ID"数量没有限制,对话没有时间限制. (我个人认为这不是问题,因为我可以从 conversationId 到工作人员编号进行简单的映射,例如对话ID%totalWorkers).

Update: The number of possible 'conversation-ids' is not limited, and there is no time limit on a conversation. (I personally don't see it as a problem, since I can have a simple mapping from a conversationId to a worker number, e.g. conversationId % totalWorkers).

更新2::具有多个队列的解决方案存在一个问题,其中队列编号由例如' index = Objects.hash(conversationId)%总计':如果处理某条消息花费的时间很长,则所有具有相同' index '但不同的'<即使其他线程可用来处理它,em> conversationId 也将等待.也就是说,我相信使用单个智能阻止队列的解决方案会更好,但这只是一种观点,我欢迎任何好的解决方案.

Update 2: There is one problem with a solution with multiple queues, where the queue number is determined by e.g. 'index = Objects.hash(conversationId) % total': if it takes a long time to process some message, all messages with the same 'index' but different 'conversationId' will wait even though other threads are available to handle it. That is, I believe solutions with a single smart blocking queue would be better, but it's just an opinion, I am open to any good solution.

您看到这个问题的理想解决方案吗?

Do you see an elegant solution for this problem?

推荐答案

前段时间我不得不做一些非常相似的事情,所以这里是改编.

I had to do something very similar some time ago, so here is an adaptation.

( ="ahttp://rextester.com/IWPLAV52962" rel ="noreferrer">在线查看它)

实际上是完全相同的基本需求,但是在我的情况下,键是字符串,更重要的是,键的集合不是无限增长的,因此在这里我必须添加清理调度程序".除此之外,它基本上是相同的代码,所以我希望我在适应过程中不会失去任何重要的东西.我对其进行了测试,看起来它可以正常工作.它比其他解决方案更长,但也许更复杂...

It's actually the exact same base need, but in my case the key was a String, and more importantly the set of keys was not growing indefinitely, so here I had to add a "cleanup scheduler". Other than that it's basically the same code, so I hope I have not lost anything serious in the adaptation process. I tested it, looks like it works. It's longer than other solutions, though, perhaps more complex...

基本思路:

  • MessageTask将消息包装到Runnable中,并在消息完成时通知队列
  • ConvoQueue:阻止对话的消息队列.充当保证所需顺序的先决队列.尤其请参见以下三项:ConvoQueue.runNextIfPossible()MessageTask.run()ConvoQueue.complete()→…
  • MessageProcessor有一个Map<Long, ConvoQueue>和一个ExecutorService
  • 消息是由执行程序中的 any 线程处理的,ConvoQueue s为每个c层提供ExecutorService并保证消息顺序,但不是全局的(因此,困难"消息不会像其他解决方案那样阻止其他对话的处理,并且在我们的案例中,该属性至关重要-如果对您而言不那么重要,也许更简单的解决方案会更好)
  • 使用ScheduledExecutorService清理(占用1个线程)
  • MessageTask wraps a message into a Runnable, and notifies queue when it is complete
  • ConvoQueue: blocking queue of messages, for a conversation. Acts as a prequeue that guarantees desired order. See this trio in particular: ConvoQueue.runNextIfPossible()MessageTask.run()ConvoQueue.complete() → …
  • MessageProcessor has a Map<Long, ConvoQueue>, and an ExecutorService
  • messages are processed by any thread in the executor, the ConvoQueues feed the ExecutorService and guarantee message order per convo, but not globally (so a "difficult" message will not block other conversations from being processed, unlike some other solutions, and that property was critically important in our case -- if it's not that critical for you, maybe a simpler solution is better)
  • cleanup with ScheduledExecutorService (takes 1 thread)

视觉上:

   ConvoQueues              ExecutorService's internal queue
                            (shared, but has at most 1 MessageTask per convo)
Convo 1   ########   
Convo 2      #####   
Convo 3    #######                        Thread 1
Convo 4              } →    ####    → {
Convo 5        ###                        Thread 2
Convo 6  #########   
Convo 7      #####   

(Convo 4 is about to be deleted)

所有类下面(MessageProcessorTest可以直接执行):

Below all the classes (MessageProcessorTest can be executed directly):

// MessageProcessor.java
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static java.util.concurrent.TimeUnit.SECONDS;

public class MessageProcessor {

    private static final long CLEANUP_PERIOD_S = 10;
    private final Map<Long, ConvoQueue> queuesByConvo = new HashMap<>();
    private final ExecutorService executorService;

    public MessageProcessor(int nbThreads) {
        executorService = Executors.newFixedThreadPool(nbThreads);
        ScheduledExecutorService cleanupScheduler = Executors.newScheduledThreadPool(1);
        cleanupScheduler.scheduleAtFixedRate(this::removeEmptyQueues, CLEANUP_PERIOD_S, CLEANUP_PERIOD_S, SECONDS);
    }

    public void addMessageToProcess(Message message) {
        ConvoQueue queue = getQueue(message.getConversationId());
        queue.addMessage(message);
    }

    private ConvoQueue getQueue(Long convoId) {
        synchronized (queuesByConvo) {
            return queuesByConvo.computeIfAbsent(convoId, p -> new ConvoQueue(executorService));
        }
    }

    private void removeEmptyQueues() {
        synchronized (queuesByConvo) {
            queuesByConvo.entrySet().removeIf(entry -> entry.getValue().isEmpty());
        }
    }

}


// ConvoQueue.java
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;

class ConvoQueue {

    private Queue<MessageTask> queue;
    private MessageTask activeTask;
    private ExecutorService executorService;

    ConvoQueue(ExecutorService executorService) {
        this.executorService = executorService;
        this.queue = new LinkedBlockingQueue<>();
    }

    private void runNextIfPossible() {
        synchronized(this) {
            if (activeTask == null) {
                activeTask = queue.poll();
                if (activeTask != null) {
                    executorService.submit(activeTask);
                }
            }
        }
    }

    void complete(MessageTask task) {
        synchronized(this) {
            if (task == activeTask) {
                activeTask = null;
                runNextIfPossible();
            }
            else {
                throw new IllegalStateException("Attempt to complete task that is not supposed to be active: "+task);
            }
        }
    }

    boolean isEmpty() {
        return queue.isEmpty();
    }

    void addMessage(Message message) {
        add(new MessageTask(this, message));
    }

    private void add(MessageTask task) {
        synchronized(this) {
            queue.add(task);
            runNextIfPossible();
        }
    }

}

// MessageTask.java
public class MessageTask implements Runnable {

    private ConvoQueue convoQueue;
    private Message message;

    MessageTask(ConvoQueue convoQueue, Message message) {
        this.convoQueue = convoQueue;
        this.message = message;
    }

    @Override
    public void run() {
        try {
            processMessage();
        }
        finally {
            convoQueue.complete(this);
        }
    }

    private void processMessage() {
        // Dummy processing with random delay to observe reordered messages & preserved convo order
        try {
            Thread.sleep((long) (50*Math.random()));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(message);
    }

}

// Message.java
class Message {

    private long id;
    private long conversationId;
    private String data;

    Message(long id, long conversationId, String someData) {
        this.id = id;
        this.conversationId = conversationId;
        this.data = someData;
    }

    long getConversationId() {
        return conversationId;
    }

    String getData() {
        return data;
    }

    public String toString() {
        return "Message{" + id + "," + conversationId + "," + data + "}";
    }
}

// MessageProcessorTest.java
public class MessageProcessorTest {
    public static void main(String[] args) {
        MessageProcessor test = new MessageProcessor(2);
        for (int i=1; i<100; i++) {
            test.addMessageToProcess(new Message(1000+i,i%7,"hi "+i));
        }
    }
}

输出(保留每个convo ID(第二字段)的订单):

Output (for each convo ID (2nd field) order is preserved):

Message{1002,2,hi 2}
Message{1001,1,hi 1}
Message{1004,4,hi 4}
Message{1003,3,hi 3}
Message{1005,5,hi 5}
Message{1006,6,hi 6}
Message{1009,2,hi 9}
Message{1007,0,hi 7}
Message{1008,1,hi 8}
Message{1011,4,hi 11}
Message{1010,3,hi 10}
...
Message{1097,6,hi 97}
Message{1095,4,hi 95}
Message{1098,0,hi 98}
Message{1099,1,hi 99}
Message{1096,5,hi 96}

上面的测试使我有信心分享它,但是我有点担心我可能忘记了病理病例的细节.它已经在生产环境中运行了数年之久(尽管有更多的代码可以让我们实时查看它的运行状况,为什么某个队列需要花费时间等)对其进行实时检查-上面的系统本身从来没有问题,但是有时会处理特定任务)

Test above provided me confidence to share it, but I'm slightly worried that I might have forgotten details for pathological cases. It has been running in production for years without hitches (although with more code that allows to inspect it live when we need to see what's happening, why a certain queue takes time, etc -- never a problem with the system above in itself, but sometimes with the processing of a particular task)

单击此处进行在线测试.备选:在那要点 "rel ="noreferrer">有,然后按编译并执行".

click here to test online. Alternative: copy that gist in there, and press "Compile & Execute".

这篇关于线程池以并行方式处理消息,但保留对话中的顺序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆