服务中的停车线 [英] Parking threads in service

查看:74
本文介绍了服务中的停车线的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试线程驻留,并决定构建某种服务。这样的样子:

I'm experimenting with threads parking and decided to build some sort of service. Here is how it looks like:

public class TestService {
    private static final Logger logger = LoggerFactory.getLogger(TestService.class); // logback I think this logger causes some troubles

    private final CountDownLatch stopLatch;
    private final Object parkBlocker = new Object();
    private volatile boolean stopped;
    private final Thread[] workers;

    public TestService(int parallelizm) {
        stopLatch = new CountDownLatch(parallelizm);
        workers = new Thread[parallelizm];
        for (int i = 0; i < parallelizm; i++) {
            workers[i] = new Thread(() -> {
                try {
                    while (!stopped) {
                        logger.debug("Parking " + Thread.currentThread().getName());
                        LockSupport.park(parkBlocker);
                        logger.debug(Thread.currentThread().getName() + " unparked");
                    }
                } finally {
                    stopLatch.countDown();
                }
            });
        }
    }

    public void start() {
        Arrays.stream(workers).forEach(t -> {
            t.start();
            logger.debug(t.getName() + " started");
        });
    }

    public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
        boolean stoppedSuccefully = false;
        this.stopped = true;
        unparkWorkers();
        if (stopLatch.await(timeout, unit)) {
            stoppedSuccefully = true;
        }
        return stoppedSuccefully;
    }

    private void unparkWorkers() {
        Arrays.stream(workers).forEach(w -> {
            LockSupport.unpark(w);
            logger.debug("Un-park call is done on " + w.getName());
        });
    }
}

我面临的问题是,如果我再进行测试此服务如下:

The issue I faced with was that if I then test this service as follows:

public static void main(String[] args) = {
  while(true) {
    TestService service = new TestService(2);
    service.start();
    if (!service.stop(10000, TimeUnit.MILLISECONDS))
      throw new RuntimeException();
  }
}

我有时会出现以下行为:

I sometimes got the following behavior:

14:58:55.226 [main] DEBUG com.pack.age.TestService - Thread-648 started
14:58:55.227 [Thread-648] DEBUG com.pack.age.TestService - Parking Thread-648
14:58:55.227 [main] DEBUG com.pack.age.TestService - Thread-649 started
14:58:55.227 [main] DEBUG com.pack.age.TestService - Un-park call is done on Thread-648
14:58:55.227 [Thread-648] DEBUG com.pack.age.TestService - Thread-648 unparked
14:58:55.227 [main] DEBUG com.pack.age.TestService - Un-park call is done on Thread-649
14:58:55.227 [Thread-649] DEBUG com.pack.age.TestService - Parking Thread-649
Exception in thread "main" java.lang.RuntimeException
    at com.pack.age.Test$.main(Test.scala:12)
    at com.pack.age.Test.main(Test.scala)

线程在停车:

"Thread-649" #659 prio=5 os_prio=0 tid=0x00007efe4433f000 nid=0x7691 waiting on condition [0x00007efe211c8000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000720739a68> (a java.lang.Object)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at com.pack.age.TestService.lambda$new$0(TestService.java:27)
    at com.pack.age.TestService$$Lambda$1/1327763628.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:748)

我在服务中没有看到公园/公园以外的比赛。此外,如果在 park 之前调用 unpark ,则 park 保证不会阻塞(这就是javadocs所说的)。

I don't see any race in park-unpark in the service. Moreover if the unpark is called before park, the park is guaranteed no to block (that's what javadocs say).

也许我误用了 LockSupport :: park 。您可以提出任何修复建议吗?

Maybe I misuse LockSupport::park. Can you suggest any fix?

推荐答案

这与记录器无关,尽管它的用法使问题浮出水面。就这么简单,您就有了比赛条件。在解释这种竞争情况之前,您需要先了解 LockSupport :: unpark 文档中的一些内容:

This has nothing to do with logger, though it's usage brings the problem to the surface. You have a race condition, as simple as that. Before explaining that race condition you need to understand a few things from LockSupport::unpark documentation first:


使给定线程的许可(如果尚不可用)可用。如果线程在驻留时被阻止,则它将取消阻止。 否则,它的下一个停车请求肯定不会被阻止。

第一点是解释此处。简短的版本是:如果您有一个已经启动的线程,但是尚未称为 park ,并且在这段时间内(在线程的开始 park 之间)其他线程在第一个线程上调用 unpark :该线程根本不会驻留。许可证将立即可用。也许是这张小图可以使它更清晰:

The first point is explain here. The short version is : if you have a thread that has already been started, but has not yet called park, and within this period of time (between the start of the thread and park), some other thread calls unpark on the first one : that thread will not park, at all. The permit will be available immediately. May be this little drawing will make it more clear:

(ThreadA)  start ------------------ park --------- ....

(ThreadB)  start ----- unpark -----

注意 ThreadB 如何调用 unpark(ThreadA) ThreadA 调用 start park 。因此,当 ThreadA 到达 park 保证不会阻塞时,就像

Notice how ThreadB calls unpark(ThreadA) between the period where ThreadA has called start and park. As such, when ThreadA reaches park: it is guaranteed not to block, exactly like the documentation says.

同一文档的第二点是:


如果未启动给定线程,则不能保证此操作完全无效。

This operation is not guaranteed to have any effect at all if the given thread has not been started.

让我们通过图形来看看:

Let's see that via a drawing:

Thread B calls unpark(ThreadA) --- Thread A starts --- Thread A calls park 

ThreadA 调用 park ,它将永久挂起,因为 ThreadB 再也不会在其上调用 unpark 了。请注意,在之前 ThreadA 调用了 unpark (与前面的示例不同) )。

After ThreadA calls park, it will hang forever, since ThreadB never calls unpark on it again. Notice that the call to unpark was made before ThreadA has started (unlike the previous example).

这正是您所遇到的情况:

And this is exactly what happens in your case:

LockSupport。 unpark(w); (来自 unparkWorkers )在之前 t.start()被称为; 来自 public void start(){...} 。用简单的话来说-您的代码在两个工人 开始之前都调用 unpark 例如,当他们最终到达停车-被卡住时,没有人能够取消停车。您使用 logger 而不是 System :: out 看到此事实的原因很可能与面部有关那当你 println 时-引擎盖下有一个 synchronized 方法。

LockSupport.unpark(w); (from unparkWorkers) is called before t.start(); from public void start(){...}. In simpler words - your code calls unpark on both workers before they even start, as such when they ultimately reach park - they are stuck, no one is able to unpark them. The fact that you see this with a logger and not with System::out has most probably to do with the face that when you println - there is a synchronized method under the hood.

事实上, LockSupport 提供了准确的语义需要证明这一点。为此,我们需要(为简单起见: SOProblem service = new SOProblem(1);

As a matter of fact, LockSupport offers exactly the semantics needed to prove this. For this we need (for simplicity : SOProblem service = new SOProblem(1);)

static class ParkBlocker {

    private volatile int x;

    public ParkBlocker(int x) {
        this.x = x;
    }

    public int getX() {
        return x;
    }
}

现在我们需要将其插入适当的方法中。首先标记一个事实,我们调用了 unpark

And now we need to insert this in proper methods. First flag that fact that we have called unpark:

private void unparkWorkers() {
    Arrays.stream(workers).forEach(w -> {
        LockSupport.unpark(w);
        logger.debug("Un-park call is done on " + w.getName());
    });
    /*
     * add "1" to whatever there is already in pb.x, meaning
     * we have done unparking _also_
     */
    int y = pb.x;
    y = y + 1;
    pb.x = y;
}

然后在循环结束后重置标志:

Then reset the flag after a cycle has ended:

public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
    boolean stoppedSuccefully = false;
    stopped = true;
    unparkWorkers();
    if (stopLatch.await(timeout, unit)) {
        stoppedSuccefully = true;
        // reset the flag
        pb.x = 0;
    }
    return stoppedSuccefully;
}

然后更改构造函数以标记线程已开始:

Then change the constructor to flag that the thread has started:

  .....
  while (!stopped) {
       logger.debug("Parking " + Thread.currentThread().getName());
       // flag the fact that thread has started. add "2", meaning
       // thread has started
       int y = pb.x;
       y = y + 2;
       pb.x = y;
       LockSupport.park(pb);
       logger.debug(Thread.currentThread().getName() + " unparked");
  }

然后,当线程冻结时,您需要检查标志:

Then, when your thread freezes you need to inspect the flag:

 public static void main(String[] args) throws InterruptedException {
    while (true) {
        SOProblem service = new SOProblem(1); // <-- notice a single worker, for simplicity
        service.start();
        if (!service.stop(10000, TimeUnit.MILLISECONDS)) {
            service.debug();
            throw new RuntimeException();
        }
    }
}

其中 debug 方法是:

public void debug() {
    Arrays.stream(workers)
          .forEach(x -> {
              ParkBlocker pb = (ParkBlocker) LockSupport.getBlocker(x);
              if (pb != null) {
                  System.out.println("x = " + pb.getX());
              }
          });
}

当问题重现时,您致电取消停车 之前,您调用了停车,这发生在 x = 3 作为输出。

When the issue re-produces, you have called unpark before you called park, that happens when x = 3 as the output.

这篇关于服务中的停车线的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆