确保连续的项目顺序的PriorityBlockingQueue [英] PriorityBlockingQueue that ensures consecutive item sequence

查看:59
本文介绍了确保连续的项目顺序的PriorityBlockingQueue的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在接收一系列消息,我想按它们的顺序处理它们.每个消息都有一个序列号.有一个线程池接收它们.我想将它们放入 PriorityBlockingQueue 之类的阻塞队列中,并以正确的顺序读取它们,阻塞直到下一条连续消息可用为止.

I'm receiving a sequence of messages, and I want to process them in their sequential order. Each message has a sequence number. There's a pool of threads receiving them. I want to put them into a blocking queue like a PriorityBlockingQueue, and read them in the right order, blocking until the next consecutive message is available.

例如给出以下代码:

ConsecutiveBlockingQueue<Integer> q = new ConsecutiveBlockingQueue<>();

new Thread (()->{ q.put(0); q.put(2); }).start();
new Thread (()->{ q.put(1); q.put(3); }).start();

ArrayList<Integer> ordered = new ArrayList<>(4);
for (int i = 0; i < 4; i++) {
    ordered.add(q.take());
}
System.out.println(ordered);

我希望它打印[0,1,2,3]

I want it to print [0, 1, 2, 3]

推荐答案

这是经过最低程度测试的类,似乎可以满足我的要求.欢迎发表评论.

Here's a minimally tested class that seems to do what I want. Comments welcome.

package com.ciphercloud.sdp.common;

import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.PriorityQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.ToIntFunction;

public class ConsecutiveBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
    private final ToIntFunction <E> ixFunction;
    // blocking queue for consecutive items. Take operations will look into this queue
    LinkedBlockingQueue <E> bQueue = new LinkedBlockingQueue<>();

    // buffering/ordering queue for items that are out of sequence
    PriorityQueue <E> pQueue = new PriorityQueue<>();

    ReentrantLock lock = new ReentrantLock();

    private int nextIx;

    ConsecutiveBlockingQueue(ToIntFunction <E> ixFunction) {
        this(0, ixFunction);
    }

    ConsecutiveBlockingQueue(int startIx, ToIntFunction <E> ixFunction) {
        nextIx = startIx;
        this.ixFunction = ixFunction;
    }

    @Override
    public Iterator <E> iterator() {
        return bQueue.iterator();
    }

    @Override
    public int size() {
        return bQueue.size();
    }

    protected BlockingQueue <E> delegate() {
        return bQueue;
    }

    @Override
    public int remainingCapacity() {
        return Integer.MAX_VALUE;
    }

    @Override
    public int drainTo(Collection <? super E> c) {
        return bQueue.drainTo(c);
    }

    @Override
    public int drainTo(Collection <? super E> c, int maxElements) {
        return bQueue.drainTo(c, maxElements);
    }

    @Override
    public void put(E e) {
        offer(e);
    }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        offer(e);
        return true;
    }

    @Override
    public boolean offer(E e) {
        lock.lock();
        try {
            if (ixFunction.applyAsInt(e) == nextIx) {
                // offered item is the next consecutive expected one
                // put it directly into the blocking queue
                bQueue.offer(e);
                nextIx++;

                // if there are any buffered items in the pQueue, move them
                // into the blocking queue while they follow consecutively
                while(true) {
                    E next = pQueue.peek();
                    if(next == null || ixFunction.applyAsInt(next) != nextIx) {
                        // no more items in pQueue, or next item is not consecutive
                        break;
                    }
                    pQueue.poll();
                    bQueue.offer(next);
                    nextIx++;
                }
            } else {
                // offered item is not consecutively next. Buffer it in pQueue
                pQueue.offer(e);
            }
        } finally {
            lock.unlock();
        }

        return true;
    }


    @Override
    public E take() throws InterruptedException {
        return bQueue.take();
    }

    @Override
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        return bQueue.poll(timeout, unit);
    }


    @Override
    public E poll() {
        return bQueue.poll();
    }

    @Override
    public E peek() {
        return bQueue.peek();
    }
}

这篇关于确保连续的项目顺序的PriorityBlockingQueue的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆