创建SingleBlockingQueue同步器 [英] Creating a SingleBlockingQueue Synchronizer

查看:151
本文介绍了创建SingleBlockingQueue同步器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图创建一个 SingleBlockingQueue< T> 同步器,允许一个线程 offer()元素,另一个线程 take()它。在 SingleBlockingQueue< T> 中每次只有一个 T 元素被保持, code> offer()如果上一个元素正在等待线程 take()它。推送线程将继续推送项目,直到它调用 setComplete(),并且接受线程将继续调用 take() isComplete()为false。如果它正在等待一个元素,线程将阻塞。

I'm trying to create a SingleBlockingQueue<T> synchronizer that allows one thread to offer() an element to it, and another thread that will take() it. Only one T element is held inside the SingleBlockingQueue<T> at a time, and the pushing thread is blocked on offer() if the previous element is waiting for the taking thread to take() it. The pushing thread will keep pushing items until it calls setComplete(), and the taking thread will keep calling take() while isComplete() is false. The taking thread will block if it is waiting for an element.

这里是我到目前为止的同步器。

Here is the synchronizer I've got so far.

import java.util.concurrent.atomic.AtomicBoolean;

public final class SingleBlockingQueue<T> {

    private volatile T value;
    private final AtomicBoolean isComplete = new AtomicBoolean(false);
    private final AtomicBoolean isPresent =  new AtomicBoolean(false);

    public void offer(T value) throws InterruptedException {
        while (isPresent.get()) {
            this.wait();
        }
        this.value = value;
        synchronized(this) {
            this.notifyAll();
        }
    }
    public boolean isComplete() {
        return !isPresent.get() && isComplete.get();
    }
    public void setComplete() {
        isComplete.set(true);
    }
    public T take() throws InterruptedException {
        while (!isPresent.get()) {
            this.wait();
        }
        T returnValue = value;
        isPresent.set(false);
        synchronized(this) {
            this.notifyAll();
        }
        return returnValue;
    }
}

这是Kotlin中的用法示例

Here is a usage example in Kotlin

    val queue = SingleBlockingQueue<Int>()

    thread {
        for (i in 1..1000) {
            queue.offer(i)
        }
        queue.setComplete()
    }

    thread {
        while (!queue.isComplete) {
            println(queue.take())
        }
    }

    Thread.sleep(100000)

但是,我得到一个错误,我有点在我的头在这一点。由于RxJava,我没有做同步器很长时间。我究竟做错什么?

However, I am getting an error and I'm a bit in over my head at this point. I haven't made synchronizers in a long time thanks to RxJava. What exactly am I doing wrong?

Exception in thread "Thread-1" java.lang.IllegalMonitorStateException
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Object.java:502)
    at com.swa.rm.common.util.SingleBlockingQueue.take(SingleBlockingQueue.java:29)
    at RxOperatorTest$testSingleBlockingQueue$2.invoke(RxOperatorTest.kt:33)
    at RxOperatorTest$testSingleBlockingQueue$2.invoke(RxOperatorTest.kt:8)
    at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:18)


推荐答案

out,你可以使用 SynchronousQueue 中的现有实现。

As others have pointed out, you could use the existing implementation in SynchronousQueue.

如果你想实现自己的,关闭,你只需要确保 wait()的调用在 synchronized 块中。

If you want to implement your own, you are pretty close, you just need to make sure the calls to wait() are inside the synchronized block.

不幸的是,我相信 isComplete() / setComplete()在您的原始代码中受到一个竞争条件,因为 setComplete()可能会在 isComplete() false 之前或甚至在读取线程正在执行 take()时。这可能会挂起阅读线程。

Unfortunately, I believe the isComplete() / setComplete() mechanism in your original code is subject to a race condition, as setComplete() may be called after isComplete() has returned false and before or even while the reading thread is executing take(). This would potentially hang the reading thread.

  public final class SingleBlockingQueue<T> {
    private final Object lock = new Object();
    private T value;
    private boolean present = false;

    public void offer(T value) throws InterruptedException {
      synchronized (lock) {
        while (present)
          lock.wait();
        this.value = value;
        present = true;
        lock.notifyAll();
      }
    }

    public T take() throws InterruptedException {
      synchronized (lock) {
        while (!present)
          lock.wait();
        T returnValue = value;
        value = null; // Should release reference
        present = false;
        lock.notifyAll();
        return returnValue;
      }
    }
  }

更自然地基于 Semaphore 条件对象实现此类队列。这是一个使用一对信号量来表示空/满状态的实现。

For comparison, it may be more natural to implement this kind of queue based on Semaphore or Condition objects. Here is an implementation using a pair of semaphores to signal empty/full conditions.

  public final class SingleBlockingQueue<T> {
    private volatile T value;
    private final Semaphore full = new Semaphore(0);
    private final Semaphore empty = new Semaphore(1);

    public void offer(T value) throws InterruptedException {
      empty.acquire();
      this.value = value;
      full.release();
    }

    public T take() throws InterruptedException {
      full.acquire();
      T returnValue = value;
      value = null; // Should release reference
      empty.release();
      return returnValue;
    }
  }

这篇关于创建SingleBlockingQueue同步器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆