使用Java在线程之间管道数据 [英] Piping data between threads with Java

查看:208
本文介绍了使用Java在线程之间管道数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个模仿电影院的多线程应用程序。涉及的每个人都是自己的线程,并发必须完全由信号量完成。我唯一的问题是如何基本上链接线程,以便他们可以通信(例如通过管道)。

I am writing a multi-threaded application that mimics a movie theater. Each person involved is its own thread and concurrency must be done completely by semaphores. The only issue I am having is how to basically link threads so that they can communicate (via a pipe for instance).

例如:

客户[1]是一个线程,获取一个信号量让它走了到票房。现在,客户[1]必须告诉Box Office Agent他们想要看电影X。然后BoxOfficeAgent [1]也是一个线程,必须检查以确保电影未满,并且要么卖票或告诉客户[1]选择另一部电影。

Customer[1] which is a thread, acquires a semaphore that lets it walk up to the Box Office. Now Customer[1] must tell the Box Office Agent that they want to see movie "X". Then BoxOfficeAgent[1] also a thread, must check to make sure the movie isn't full and either sell a ticket or tell Customer[1] to pick another movie.

如何在保持与信号量的并发性的同时来回传递数据?

How do I pass that data back and forth while still maintaining concurrency with the semaphores?

此外,我可以在java.util.concurrent中使用的唯一类是< a href =http://download.oracle.com/javase/6/docs/api/index.html?java/util/concurrent/Semaphore.html\"rel =nofollow> Semaphore 类。

Also, the only class I can use from java.util.concurrent is the Semaphore class.

推荐答案

在线程之间来回传递数据的一种简单方法是使用接口的实现 BlockingQueue< E> ,位于包 java.util.concurrent

One easy way to pass data back and forth between threads is to use the implementations of the interface BlockingQueue<E>, located in the package java.util.concurrent.

此inter faces有方法将元素添加到具有不同行为的集合中:

This interfaces has methods to add elements to the collection with different behaviors:


  • add(E):尽可能添加,否则抛出异常

  • boolean offer(E):如果元素已添加,则返回true,false否则

  • 布尔要约(E,long,TimeUnit):尝试添加元素,等待指定的时间

  • put(E):阻止调用线程,直到元素被添加为止

  • add(E): adds if possible, otherwise throws exception
  • boolean offer(E): returns true if the element has been added, false otherwise
  • boolean offer(E, long, TimeUnit): tries to add the element, waiting the specified amount of time
  • put(E): blocks the calling thread until the element has been added

它还定义了具有类似行为的元素检索方法:

It also defines methods for element retrieval with similar behaviors:


  • take( ):阻止直到有可用元素

  • poll(long,TimeUnit):检索元素或返回null

  • take(): blocks until there's an element available
  • poll(long, TimeUnit): retrieves an element or returns null

我最常使用的实现是: ArrayBlockingQueue LinkedBlockingQueue SynchronousQueue

The implementations I use most frequently are: ArrayBlockingQueue, LinkedBlockingQueue and SynchronousQueue.

第一个, ArrayBlockingQueue ,具有固定大小,由传递给其构造函数的参数定义。

The first one, ArrayBlockingQueue, has a fixed size, defined by a parameter passed to its constructor.

第二个, LinkedBlockingQueue ,具有独特的规模。它将始终接受任何元素,即 offer 将立即返回true, add 将永远不会抛出异常。

The second, LinkedBlockingQueue, has illimited size. It will always accept any elements, that is, offer will return true immediately, add will never throw an exception.

第三个,对我来说最有趣的一个, SynchronousQueue ,正是一个管道。您可以将其视为大小为0的队列。它永远不会保留一个元素:如果某个其他线程试图从中检索元素,则此队列只接受元素。相反,如果有另一个线程试图推送它,则检索操作只会返回一个元素。

The third, and to me the most interesting one, SynchronousQueue, is exactly a pipe. You can think of it as a queue with size 0. It will never keep an element: this queue will only accept elements if there's some other thread trying to retrieve elements from it. Conversely, a retrieval operation will only return an element if there's another thread trying to push it.

要实现 homework 要求只使用信号量完成同步,你可以从我给你的关于SynchronousQueue的描述中获得灵感,写一些非常相似的东西:

To fulfill the homework requirement of synchronization done exclusively with semaphores, you could get inspired by the description I gave you about the SynchronousQueue, and write something quite similar:

class Pipe<E> {
  private E e;

  private final Semaphore read = new Semaphore(0);
  private final Semaphore write = new Semaphore(1);

  public final void put(final E e) {
    write.acquire();
    this.e = e;
    read.release();
  }

  public final E take() {
    read.acquire();
    E e = this.e;
    write.release();
    return e;
  }
}

请注意,此类与我所描述的类似关于SynchronousQueue。

Notice that this class presents similar behavior to what I described about the SynchronousQueue.

一旦方法 put(E)被调用,它就会获得写信号量,这将是留空,以便对同一方法的另一次调用将阻塞其第一行。然后,此方法存储对传递的对象的引用,并释放读取的信号量。此版本将使任何调用 take()方法的线程都可以继续。

Once the methods put(E) gets called it acquires the write semaphore, which will be left empty, so that another call to the same method would block at its first line. This method then stores a reference to the object being passed, and releases the read semaphore. This release will make it possible for any thread calling the take() method to proceed.

第一步然后, take()方法自然地获取读取信号量,以便禁止任何其他线程同时检索该元素。在检索元素并将其保存在局部变量中之后(练习:如果该行,E e = this.e被删除会发生什么?),该方法释放写信号量,以便方法 put(E)可以被任何线程再次调用,并返回已保存在局部变量中的内容。

The first step of the take() method is then, naturally, to acquire the read semaphore, in order to disallow any other thread to retrieve the element concurrently. After the element has been retrieved and kept in a local variable (exercise: what would happen if that line, E e = this.e, were removed?), the method releases the write semaphore, so that the method put(E) may be called again by any thread, and returns what has been saved in the local variable.

作为一个重要的评论,请注意对正在传递的对象的引用保存在私有字段中,方法 take() put(E)都是最终。这是至关重要的,而且常常被遗漏。如果这些方法不是最终的(或者更糟糕的是,该字段不是私有的),继承类将能够改变 take()的行为。 put(E)违反合约。

As an important remark, observe that the reference to the object being passed is kept in a private field, and the methods take() and put(E) are both final. This is of utmost importance, and often missed. If these methods were not final (or worse, the field not private), an inheriting class would be able to alter the behavior of take() and put(E) breaking the contract.

最后,你可以避免在中声明一个局部变量take()方法,使用 try {} finally {} ,如下所示:

Finally, you could avoid the need to declare a local variable in the take() method by using try {} finally {} as follows:

class Pipe<E> {
  // ...
  public final E take() {
    try {
      read.acquire();
      return e;
    } finally {
      write.release();
    }
  }
}

这里,重点是例如,如果仅显示 try / finally 的使用,这在未经验的开发人员中未被注意到。显然,在这种情况下,没有真正的收获。

Here, the point of this example if just to show an use of try/finally that goes unnoticed among inexperienced developers. Obviously, in this case, there's no real gain.

哦,该死的,我大部分时间都为你完成了你的功课。在报复中 - 并且为了测试您对信号量的了解 - 为什么不实现BlockingQueue合约定义的其他一些方法呢?例如,您可以实现 offer(E)方法和 take(E,long,TimeUnit)

Oh damn, I've mostly finished your homework for you. In retribution -- and for you to test your knowledge about Semaphores --, why don't you implement some of the other methods defined by the BlockingQueue contract? For example, you could implement an offer(E) method and a take(E, long, TimeUnit)!

祝你好运。

这篇关于使用Java在线程之间管道数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆