用 Java 编写多线程映射迭代器 [英] Writing a multithreaded mapping iterator in Java

查看:54
本文介绍了用 Java 编写多线程映射迭代器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个通用的映射迭代器:像这样:

I've got a general purpose mapping iterator: something like this:

class Mapper<F, T> implements Iterator<T> {

  private Iterator<F> input;
  private Action<F, T> action;

  public Mapper(input, action) {...}

  public boolean hasNext() {
    return input.hasNext();
  }

  public T next() {
    return action.process(input.next());
  }
}

现在,考虑到 action.process() 可能很耗时,我想通过使用多个线程并行处理来自输入的项目来提高性能.我想分配一个 N 个工作线程池,并将项目分配给这些线程进行处理.这应该发生在幕后",因此客户端代码只会看到一个迭代器.代码应避免将输入或输出序列保存在内存中.

Now, given that action.process() can be time-consuming, I want to gain performance by using multiple threads to process items from the input in parallel. I want to allocate a pool of N worker threads and allocate items to these threads for processing. This should happen "behind the scenes" so the client code just sees an Iterator. The code should avoid holding either the input or the output sequence in memory.

为了增加一个变化,我想要两个版本的解决方案,一个保留顺序(最终迭代器以与输入迭代器相同的顺序交付项目),另一个不一定保持顺序(每个输出项目交付尽快可用).

To add a twist, I want two versions of the solution, one which retains order (the final iterator delivers items in the same order as the input iterator) and one of which does not necessarily retain order (each output item is delivered as soon as it is available).

我已经完成了这项工作,但代码似乎令人费解且不可靠,而且我不确定它是否使用了最佳实践.

I've sort-of got this working but the code seems convoluted and unreliable and I'm not confident it's using best practice.

有关实现此功能的最简单和最可靠的方法有什么建议吗?我正在寻找适用于 JDK 6 的东西,并且我想尽可能避免引入对外部库/框架的依赖.

Any suggestions on the simplest and most robust way of implementing this? I'm looking for something that works in JDK 6, and I want to avoid introducing dependencies on external libraries/frameworks if possible.

推荐答案

我会为线程使用一个线程池,并使用一个 BlockingQueue 从池中输出.

I'd use a thread pool for the threads and a BlockingQueue to feed out from the pool.

这似乎适用于我的简单测试用例.

This seems to work with my simple test cases.

interface Action<F, T> {

    public T process(F f);

}

class Mapper<F, T> implements Iterator<T> {

    protected final Iterator<F> input;
    protected final Action<F, T> action;

    public Mapper(Iterator<F> input, Action<F, T> action) {
        this.input = input;
        this.action = action;
    }

    @Override
    public boolean hasNext() {
        return input.hasNext();
    }

    @Override
    public T next() {
        return action.process(input.next());
    }
}

class ParallelMapper<F, T> extends Mapper<F, T> {

    // The pool.
    final ExecutorService pool;
    // The queue.
    final BlockingQueue<T> queue;
    // The next one to deliver.
    private T next = null;

    public ParallelMapper(Iterator<F> input, Action<F, T> action, int threads, int queueLength) {
        super(input, action);
        // Start my pool.
        pool = Executors.newFixedThreadPool(threads);
        // And the queue.
        queue = new ArrayBlockingQueue<>(queueLength);
    }

    class Worker implements Runnable {

        final F f;
        private T t;

        public Worker(F f) {
            this.f = f;
        }

        @Override
        public void run() {
            try {
                queue.put(action.process(f));
            } catch (InterruptedException ex) {
                // Not sure what you can do here.
            }
        }

    }

    @Override
    public boolean hasNext() {
        // All done if delivered it and the input is empty and the queue is empty and the threads are finished.
        while (next == null && (input.hasNext() || !queue.isEmpty() || !pool.isTerminated())) {
            // First look in the queue.
            next = queue.poll();
            if (next == null) {
                // Queue empty.
                if (input.hasNext()) {
                    // Start a new worker.
                    pool.execute(new Worker(input.next()));
                }
            } else {
                // Input exhausted - shut down the pool - unless we already have.
                if (!pool.isShutdown()) {
                    pool.shutdown();
                }
            }
        }
        return next != null;
    }

    @Override
    public T next() {
        T n = next;
        if (n != null) {
            // Delivered that one.
            next = null;
        } else {
            // Fails.
            throw new NoSuchElementException();
        }
        return n;
    }
}

public void test() {
    List<Integer> data = Arrays.asList(5, 4, 3, 2, 1, 0);
    System.out.println("Data");
    for (Integer i : Iterables.in(data)) {
        System.out.println(i);
    }
    Action<Integer, Integer> action = new Action<Integer, Integer>() {

        @Override
        public Integer process(Integer f) {
            try {
                // Wait that many seconds.
                Thread.sleep(1000L * f);
            } catch (InterruptedException ex) {
                // Just give up.
            }
            // Return it unchanged.
            return f;
        }

    };
    System.out.println("Processed");
    for (Integer i : Iterables.in(new Mapper<Integer, Integer>(data.iterator(), action))) {
        System.out.println(i);
    }
    System.out.println("Parallel Processed");
    for (Integer i : Iterables.in(new ParallelMapper<Integer, Integer>(data.iterator(), action, 2, 2))) {
        System.out.println(i);
    }

}

注意:Iterables.in(Iterator)只是创建了一个Iterable,封装了传入的Iterator.

Note: Iterables.in(Iterator<T>) just creates an Iterable<T> that encapsulates the passed Iterator<T>.

对于您的有序序列,您可以处理 Pair 并使用 PriorityQueue 作为线程输出.然后,您可以安排按顺序拉动它们.

For your in-order one you could process Pair<Integer,F> and use a PriorityQueue for the thread output. You could then arrange to pull them in order.

这篇关于用 Java 编写多线程映射迭代器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆