服务中的停车线 [英] Parking threads in service
问题描述
我正在尝试线程驻留,并决定构建某种服务。这样的样子:
I'm experimenting with threads parking and decided to build some sort of service. Here is how it looks like:
public class TestService {
private static final Logger logger = LoggerFactory.getLogger(TestService.class); // logback I think this logger causes some troubles
private final CountDownLatch stopLatch;
private final Object parkBlocker = new Object();
private volatile boolean stopped;
private final Thread[] workers;
public TestService(int parallelizm) {
stopLatch = new CountDownLatch(parallelizm);
workers = new Thread[parallelizm];
for (int i = 0; i < parallelizm; i++) {
workers[i] = new Thread(() -> {
try {
while (!stopped) {
logger.debug("Parking " + Thread.currentThread().getName());
LockSupport.park(parkBlocker);
logger.debug(Thread.currentThread().getName() + " unparked");
}
} finally {
stopLatch.countDown();
}
});
}
}
public void start() {
Arrays.stream(workers).forEach(t -> {
t.start();
logger.debug(t.getName() + " started");
});
}
public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
boolean stoppedSuccefully = false;
this.stopped = true;
unparkWorkers();
if (stopLatch.await(timeout, unit)) {
stoppedSuccefully = true;
}
return stoppedSuccefully;
}
private void unparkWorkers() {
Arrays.stream(workers).forEach(w -> {
LockSupport.unpark(w);
logger.debug("Un-park call is done on " + w.getName());
});
}
}
我面临的问题是,如果我再进行测试此服务如下:
The issue I faced with was that if I then test this service as follows:
public static void main(String[] args) = {
while(true) {
TestService service = new TestService(2);
service.start();
if (!service.stop(10000, TimeUnit.MILLISECONDS))
throw new RuntimeException();
}
}
我有时会出现以下行为:
I sometimes got the following behavior:
14:58:55.226 [main] DEBUG com.pack.age.TestService - Thread-648 started
14:58:55.227 [Thread-648] DEBUG com.pack.age.TestService - Parking Thread-648
14:58:55.227 [main] DEBUG com.pack.age.TestService - Thread-649 started
14:58:55.227 [main] DEBUG com.pack.age.TestService - Un-park call is done on Thread-648
14:58:55.227 [Thread-648] DEBUG com.pack.age.TestService - Thread-648 unparked
14:58:55.227 [main] DEBUG com.pack.age.TestService - Un-park call is done on Thread-649
14:58:55.227 [Thread-649] DEBUG com.pack.age.TestService - Parking Thread-649
Exception in thread "main" java.lang.RuntimeException
at com.pack.age.Test$.main(Test.scala:12)
at com.pack.age.Test.main(Test.scala)
线程在停车:
"Thread-649" #659 prio=5 os_prio=0 tid=0x00007efe4433f000 nid=0x7691 waiting on condition [0x00007efe211c8000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000720739a68> (a java.lang.Object)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at com.pack.age.TestService.lambda$new$0(TestService.java:27)
at com.pack.age.TestService$$Lambda$1/1327763628.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)
我在服务中没有看到公园/公园以外的比赛。此外,如果在 park
之前调用 unpark
,则 park
保证不会阻塞(这就是javadocs所说的)。
I don't see any race in park-unpark in the service. Moreover if the unpark
is called before park
, the park
is guaranteed no to block (that's what javadocs say).
也许我误用了 LockSupport :: park
。您可以提出任何修复建议吗?
Maybe I misuse LockSupport::park
. Can you suggest any fix?
推荐答案
这与记录器无关,尽管它的用法使问题浮出水面。就这么简单,您就有了比赛条件。在解释这种竞争情况之前,您需要先了解 LockSupport :: unpark
文档中的一些内容:
This has nothing to do with logger, though it's usage brings the problem to the surface. You have a race condition, as simple as that. Before explaining that race condition you need to understand a few things from LockSupport::unpark
documentation first:
使给定线程的许可(如果尚不可用)可用。如果线程在驻留时被阻止,则它将取消阻止。 否则,它的下一个停车请求肯定不会被阻止。
第一点是解释此处。简短的版本是:如果您有一个已经启动的线程
,但是尚未称为 park
,并且在这段时间内(在线程的开始
和 park
之间)其他线程在第一个线程上调用 unpark
:该线程根本不会驻留。许可证将立即可用。也许是这张小图可以使它更清晰:
The first point is explain here. The short version is : if you have a thread
that has already been started, but has not yet called park
, and within this period of time (between the start
of the thread and park
), some other thread calls unpark
on the first one : that thread will not park, at all. The permit will be available immediately. May be this little drawing will make it more clear:
(ThreadA) start ------------------ park --------- ....
(ThreadB) start ----- unpark -----
注意 ThreadB
如何调用 unpark(ThreadA)
在 ThreadA
调用 start
和 park
。因此,当 ThreadA
到达 park
:保证不会阻塞时,就像
Notice how ThreadB
calls unpark(ThreadA)
between the period where ThreadA
has called start
and park
. As such, when ThreadA
reaches park
: it is guaranteed not to block, exactly like the documentation says.
同一文档的第二点是:
如果未启动给定线程,则不能保证此操作完全无效。
This operation is not guaranteed to have any effect at all if the given thread has not been started.
让我们通过图形来看看:
Let's see that via a drawing:
Thread B calls unpark(ThreadA) --- Thread A starts --- Thread A calls park
在 ThreadA
调用 park $ c之后$ c>,它将永久挂起,因为
ThreadB
再也不会在其上调用 unpark
了。请注意,在之前 ThreadA
调用了 unpark
(与前面的示例不同) )。
After ThreadA
calls park
, it will hang forever, since ThreadB
never calls unpark
on it again. Notice that the call to unpark
was made before ThreadA
has started (unlike the previous example).
这正是您所遇到的情况:
And this is exactly what happens in your case:
LockSupport。 unpark(w);
(来自 unparkWorkers
)在之前 t.start()被称为;
来自 public void start(){...}
。用简单的话来说-您的代码在两个工人
开始之前都调用 unpark
例如,当他们最终到达停车
-被卡住时,没有人能够取消停车
。您使用 logger
而不是 System :: out
看到此事实的原因很可能与面部有关那当你 println
时-引擎盖下有一个 synchronized
方法。
LockSupport.unpark(w);
(from unparkWorkers
) is called before t.start();
from public void start(){...}
. In simpler words - your code calls unpark
on both workers
before they even start, as such when they ultimately reach park
- they are stuck, no one is able to unpark
them. The fact that you see this with a logger
and not with System::out
has most probably to do with the face that when you println
- there is a synchronized
method under the hood.
事实上, LockSupport
提供了准确的语义需要证明这一点。为此,我们需要(为简单起见: SOProblem service = new SOProblem(1);
)
As a matter of fact, LockSupport
offers exactly the semantics needed to prove this. For this we need (for simplicity : SOProblem service = new SOProblem(1);
)
static class ParkBlocker {
private volatile int x;
public ParkBlocker(int x) {
this.x = x;
}
public int getX() {
return x;
}
}
现在我们需要将其插入适当的方法中。首先标记一个事实,我们调用了 unpark
:
And now we need to insert this in proper methods. First flag that fact that we have called unpark
:
private void unparkWorkers() {
Arrays.stream(workers).forEach(w -> {
LockSupport.unpark(w);
logger.debug("Un-park call is done on " + w.getName());
});
/*
* add "1" to whatever there is already in pb.x, meaning
* we have done unparking _also_
*/
int y = pb.x;
y = y + 1;
pb.x = y;
}
然后在循环结束后重置标志:
Then reset the flag after a cycle has ended:
public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
boolean stoppedSuccefully = false;
stopped = true;
unparkWorkers();
if (stopLatch.await(timeout, unit)) {
stoppedSuccefully = true;
// reset the flag
pb.x = 0;
}
return stoppedSuccefully;
}
然后更改构造函数以标记线程已开始:
Then change the constructor to flag that the thread has started:
.....
while (!stopped) {
logger.debug("Parking " + Thread.currentThread().getName());
// flag the fact that thread has started. add "2", meaning
// thread has started
int y = pb.x;
y = y + 2;
pb.x = y;
LockSupport.park(pb);
logger.debug(Thread.currentThread().getName() + " unparked");
}
然后,当线程冻结时,您需要检查标志:
Then, when your thread freezes you need to inspect the flag:
public static void main(String[] args) throws InterruptedException {
while (true) {
SOProblem service = new SOProblem(1); // <-- notice a single worker, for simplicity
service.start();
if (!service.stop(10000, TimeUnit.MILLISECONDS)) {
service.debug();
throw new RuntimeException();
}
}
}
其中 debug
方法是:
public void debug() {
Arrays.stream(workers)
.forEach(x -> {
ParkBlocker pb = (ParkBlocker) LockSupport.getBlocker(x);
if (pb != null) {
System.out.println("x = " + pb.getX());
}
});
}
当问题重现时,您致电取消停车
之前,您调用了停车
,这发生在 x = 3
作为输出。
When the issue re-produces, you have called unpark
before you called park
, that happens when x = 3
as the output.
这篇关于服务中的停车线的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!