异步迭代器 [英] Asynchronous Iterator

查看:160
本文介绍了异步迭代器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下的code:

while(slowIterator.hasNext()) {
  performLengthTask(slowIterator.next());
}

由于Iterator和任务很慢很有道理把那些到单独的线程。下面是一个Iterator包装一个快速和肮脏的尝试:

Because both iterator and task are slow it makes sense to put those into separate threads. Here is a quick and dirty attempt for an Iterator wrapper:

class AsyncIterator<T> implements Iterator<T> {
    private final BlockingQueue<T> queue = new ArrayBlockingQueue<T>(100);

    private AsyncIterator(final Iterator<T> delegate) {
      new Thread() {
        @Override
        public void run() {
          while(delegate.hasNext()) {
            queue.put(delegate.next()); // try/catch removed for brevity
          }
        }
      }.start();
    }

    @Override
    public boolean hasNext() {
      return true;
    }

    @Override
    public T next() {
        return queue.take(); // try/catch removed for brevity
    }
    // ... remove() throws UnsupportedOperationException
  }

然而,这没有实现支持规则hasNext()。这将是确定的,当然对于规则hasNext()方法来阻止,直到它知道是否返回真还是假。我可以在我的AsyncIterator偷看对象,我可以改变规则hasNext()取一个对象从队列旁边有()返回此偷看。但是,这会导致规则hasNext()将无限期阻塞如果委托迭代器的末尾已到达。

However this implementation lacks support for "hasNext()". It would be ok of course for the hasNext() method to block until it knows whether to return true or not. I could have a peek object in my AsyncIterator and I could change hasNext() to take an object from the queue and have next() return this peek. But this would cause hasNext() to block indefinitely if the delegate iterator's end has been reached.

而不是利用ArrayBlockingQueue我当然可以做线程通信自己:

Instead of utilizing the ArrayBlockingQueue I could of course do thread communication myself:

private static class AsyncIterator<T> implements Iterator<T> {

  private final Queue<T> queue = new LinkedList<T>();
  private boolean delegateDone = false;

  private AsyncIterator(final Iterator<T> delegate) {
    new Thread() {
      @Override
      public void run() {
        while (delegate.hasNext()) {
          final T next = delegate.next();
          synchronized (AsyncIterator.this) {
            queue.add(next);
            AsyncIterator.this.notify();
          }
        }
        synchronized (AsyncIterator.this) {
          delegateDone = true;
          AsyncIterator.this.notify();
        }
      }
    }.start();
  }

  @Override
  public boolean hasNext() {
    synchronized (this) {
      while (queue.size() == 0 && !delegateDone) {
        try {
          wait();
        } catch (InterruptedException e) {
          throw new Error(e);
        }
      }
    }
    return queue.size() > 0;
  }

  @Override
  public T next() {
    return queue.remove();
  }

  @Override
  public void remove() {
    throw new UnsupportedOperationException();
  }
}

但是,所有额外的同步,等待和分配时通知并不真正使code的可读性更强,很容易在什么地方隐藏的竞争条件。

However all the extra synchronizations, waits and notifys don't really make the code any more readable and it is easy to hide a race condition somewhere.

任何更好的想法?

是的,我不知道常见的观察者/可观察模式。然而通常的实现不预见结束的数据流和它们不是迭代

Yes I do know about common observer/observable patterns. However the usual implementations don't foresee an end to the flow of data and they are not iterators.

我特别想要一个迭代器在这里,因为其实上述循环外部库是否存在,以及它想要一个Iterator。

I specifically want an iterator here, because actually the above mentioned loop exists in an external library and it wants an Iterator.

推荐答案

这是一个棘手的,但我想我答对了这个时候。 (我删除了我的第一个答案。)

This is a tricky one, but I think I got the right answer this time. (I deleted my first answer.)

答案是使用一个哨兵。我没有测试此code和我删除的try /接球净度:

The answer is to use a sentinel. I haven't tested this code, and I removed try/catches for clarity:

public class AsyncIterator<T> implements Iterator<T> {

    private BlockingQueue<T> queue = new ArrayBlockingQueue<T>(100);
    private T sentinel = (T) new Object();
    private T next;

    private AsyncIterator(final Iterator<T> delegate) {
        new Thread() {
            @Override
            public void run() {
                while (delegate.hasNext()) {
                    queue.put(delegate.next());
                }
                queue.put(sentinel);
            }
        }.start();
    }

    @Override
    public boolean hasNext() {
        if (next != null) {
            return true;
        }
        next = queue.take(); // blocks if necessary
        if (next == sentinel) {
            return false;
        }
        return true;
    }

    @Override
    public T next() {
        T tmp = next;
        next = null;
        return tmp;
    }

}

这里的观点是规则hasNext()需要阻塞,直到下一个项目已准备就绪。它还需要某种形式的退出条件,并且不能使用空队列或布尔标志,由于线程问题。前哨解决没有任何锁定或同步的问题。

The insight here is that hasNext() needs to block until the next item is ready. It also needs some kind of quit condition, and it can't use an empty queue or a boolean flag for that because of threading issues. A sentinel solves the problem without any locking or synchronization.

编辑:缓存下一步,所以规则hasNext()的调用不止一次

这篇关于异步迭代器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆