可重置CountdownLatch [英] Resettable CountdownLatch

查看:407
本文介绍了可重置CountdownLatch的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要一个直接等价于 CountDownLatch 的东西,但是可以重置(保持线程安全!)。我不能使用经典同步构造,因为他们根本不工作在这种情况下(复杂的锁定问题)。目前,我创建了许多 CountDownLatch 对象,每个对象替换了前一个对象。我相信这是在年轻一代在GC(由于绝对数量的对象)。你可以看到使用下面的锁存器的代码(它是ns-3网络模拟器接口的 java.net mock的一部分)。



有些想法可能是尝试 CyclicBarrier (JDK5 +)或 Phaser (JDK7) p>

我可以测试代码,并找回找到这个问题的解决方案的任何人,因为我是唯一可以将它插入运行系统以查看会发生什么: )

  / ** 
*
* /
package kokunet;

import java.io.IOException;
import java.nio.channels.ClosedSelectorException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import kokuks.IConnectionSocket;
import kokuks.KKSAddress;
import kokuks.KKSSocket;
import kokuks.KKSSocketListener;

/ **
* KSelector
* @version 1.0
* @author Chris Dennett
* /
public class KSelector extends SelectorImpl {
//如果此选择器已关闭,则为true
private volatile boolean closed = false;

//锁定关闭和清除
final class CloseLock {}
private final Object closeLock = new CloseLock();

private volatile boolean selection = false;
private volatile boolean wakeup = false;

类SocketListener实现KKSSocketListener {
protected volatile CountDownLatch latch = null;

/ **
*
* /
public SocketListener(){
newLatch();
}

protected synchronized CountDownLatch newLatch(){
return this.latch = new CountDownLatch(1);
}

protected synchronized void refreshReady(KKSSocket socket){
if(!selection)return;

synchronized(socketToChannel){
SelChImpl ch = socketToChannel.get(socket);
if(ch == null){
System.out.println(ks sendCB:channel not found for socket:+ socket);
return;
}
synchronized(channelToKey){
SelectionKeyImpl sk = channelToKey.get(ch);
if(sk!= null){
if(handleSelect(sk)){
latch.countDown();
}
}
}
}
}
@Override
public void connectionSucceeded(KKSSocket socket){
refreshReady );
}
@Override
public void connectionFailed(KKSSocket socket){
refreshReady(socket);
}
@Override
public void dataSent(KKSSocket socket,long bytesSent){
refreshReady(socket);
}
@Override
public void sendCB(KKSSocket socket,long bytesAvailable){
refreshReady(socket);
}
@Override
public void onRecv(KKSSocket socket){
refreshReady(socket);
}
@Override
public void newConnectionCreated(KKSSocket socket,KKSSocket newSocket,KKSAddress remoteaddress){
refreshReady(socket);
}
@Override
public void normalClose(KKSSocket socket){
wakeup();
}
@Override
public void errorClose(KKSSocket socket){
wakeup();
}
}

protected final Map< KKSSocket,SelChImpl> socketToChannel = new HashMap< KKSSocket,SelChImpl>();
protected final Map< SelChImpl,SelectionKeyImpl> channelToKey = new HashMap< SelChImpl,SelectionKeyImpl>();
protected final SocketListener currListener = new SocketListener();
protected Thread selectionThread = null;

SelChImpl getChannelForSocket(KKSSocket s){
synchronized(socketToChannel){
return socketToChannel.get(s);
}
}

SelectionKeyImpl getSelKeyForChannel(KKSSocket s){
synchronized(channelToKey){
return channelToKey.get(s)
}
}

protected boolean markRead(SelectionKeyImpl impl){
synchronized(impl){
if(!impl.isValid())return false ;
impl.nioReadyOps(impl.readyOps()| SelectionKeyImpl.OP_READ);
return selectedKeys.add(impl);
}
}

protected boolean markWrite(SelectionKeyImpl impl){
synchronized(impl){
if(!impl.isValid())return false ;
impl.nioReadyOps(impl.readyOps()| SelectionKeyImpl.OP_WRITE);
return selectedKeys.add(impl);
}
}

protected boolean markAccept(SelectionKeyImpl impl){
synchronized(impl){
if(!impl.isValid())return false ;
impl.nioReadyOps(impl.readyOps()| SelectionKeyImpl.OP_ACCEPT);
return selectedKeys.add(impl);
}
}

protected boolean markConnect(SelectionKeyImpl impl){
synchronized(impl){
if(!impl.isValid())return false ;
impl.nioReadyOps(impl.readyOps()| SelectionKeyImpl.OP_CONNECT);
return selectedKeys.add(impl);
}
}

/ **
* @param provider
* /
protected KSelector(SelectorProvider provider){
super(provider);
}

/ *(非Javadoc)
* @see kokunet.SelectorImpl#implClose()
* /
@Override
protected void implClose()throws IOException {
provider()。getApp()。printMessage(implClose:closed:+ closed);
synchronized(closeLock){
if(closed)return;
closed = true;
for(SelectionKey sk:keys){
provider()。getApp()。printMessage(dereg1);
deregister((AbstractSelectionKey)sk);
provider()。getApp()。printMessage(dereg2);
SelectableChannel selch = sk.channel();
if(!selch.isOpen()&&!selch.isRegistered())
((SelChImpl)selch).kill
}
implCloseInterrupt();
}
}

protected void implCloseInterrupt(){
wakeup();
}

private boolean handleSelect(SelectionKey k){
synchronized(k){
boolean notify = false;

if(!k.isValid()){
k.cancel();
((SelectionKeyImpl)k).channel.socket()。removeListener(currListener);
return false;
}

SelectionKeyImpl ski =(SelectionKeyImpl)k;

if((ski.interestOps()& SelectionKeyImpl.OP_READ)!= 0){
if(ski.channel.socket()。getRxAvailable()> 0){
notify | = markRead(ski);
}
}

if((ski.interestOps()& SelectionKeyImpl.OP_WRITE)!= 0){
if(ski.channel.socket .getTxAvailable()> 0){
notify | = markWrite(ski);
}
}

if((ski.interestOps()& SelectionKeyImpl.OP_CONNECT)!= 0){
if(!ski.channel.socket ).isConnectionless()){
IConnectionSocket cs =(IConnectionSocket)ski.channel.socket();
if(!ski.channel.socket()。isAccepting()&&!cs.isConnecting()&&!cs.isConnected()){
notify | = markConnect );
}
}
}

if((ski.interestOps()& SelectionKeyImpl.OP_ACCEPT)!= 0){
// provider ).getApp()。printMessage(accept check:ski:+ ski +,connectionless:+ ski.channel.socket()。isConnectionless()+,listening:+ ski.channel.socket isList(); isListener()+,hasPendingConn:+(ski.channel.socket()。isConnectionless()?nope!:((IConnectionSocket)ski.channel.socket
if(!ski.channel.socket()。isConnectionless()&&&& amp;&&&&&&&& ();
if(cs.hasPendingConnections()){
notify | = markAccept(ski);
}
}
}
return notify;
}
}

private boolean handleSelect(){
boolean notify = false;

//获取初始状态
for(SelectionKey k:keys){
notify | = handleSelect(k);
}

return notify;
}

/ *(非Javadoc)
* @see kokunet.SelectorImpl#doSelect(long)
* /
@Override
protected int doSelect(long timeout)throws IOException {
processDeregisterQueue();

long timestartedms = System.currentTimeMillis();

synchronized(selectedKeys){
synchronized(currListener){
wakeup = false;
selectionThread = Thread.currentThread();
selection = true;
}
try {
handleSelect();

if(!selectedKeys.isEmpty()|| timeout == 0){
return selectedKeys.size();
}

// TODO:无用的操作如果我们有键可用
(SelectionKey键:keys){
((SelectionKeyImpl)键).channel.socket ().addListener(currListener);
}
try {
while(!wakeup&& isOpen()&&&" selectedKeys.isEmpty()){
CountDownLatch latch = null;
synchronized(currListener){
if(wakeup ||!isOpen()||!selectedKeys.isEmpty()){
break;
}
latch = currListener.newLatch();
}
try {
if(timeout> 0){
long currtimems = System.currentTimeMillis();
long remainingMS =(timestartedms + timeout) - currtimems;

if(remainingMS> 0){
latch.await(remainingMS,Tim​​eUnit.MILLISECONDS);
} else {
break;
}
} else {
latch.await();
}
} catch(InterruptedException e){

}
}
return selectedKeys.size();
} finally {
for(SelectionKey key:keys){
((SelectionKeyImpl)key).channel.socket()。removeListener(currListener);
}
}
} finally {
synchronized(currListener){
selection = false;
选择Thread= null;
wakeup = false;
}
}
}
}

/ *(非Javadoc)
* @see kokunet.SelectorImpl#implRegister(kokunet。 SelectionBegin(){
synchronized(closeLock){
if(closed)throw NewChildSelectorException(););
* /
@Override
protected void implRegister
synchronized(channelToKey){
synchronized(socketToChannel){
keys.add(ski);
socketToChannel.put(ski.channel.socket(),ski.channel);
channelToKey.put(ski.channel,ski);
}
}
}

}

/ *(非Javadoc)
* @see kokunet.SelectorImpl# implDereg(kokunet.SelectionKeyImpl)
* /
@Override
protected void implDereg(SelectionKeyImpl ski)throws IOException {
synchronized(channelToKey){
synchronized(socketToChannel){
keys.remove(ski);
socketToChannel.remove(ski.channel.socket());
channelToKey.remove(ski.channel);

SelectableChannel selch = ski.channel();

if(!selch.isOpen()&&!selch.isRegistered())
((SelChImpl)selch).kill
}
}
}

/ *(非Javadoc)
* @see kokunet.SelectorImpl#wakeup()
*
@Override
public Selector wakeup(){
synchronized(currListener){
if(选择){
wakeup = true;
selection = false;
selectionThread.interrupt();
选择Thread= null;
}
}
return this;
}
}

干杯,

Chris

解决方案

我复制了 CountDownLatch ,并实施了 ()方法将内部同步类重置为其初始状态(开始计数):)显示工作正常。没有更多不必要的对象创建\o /不可能子类化,因为 sync 是私有的。 Boo。

  import java.util.concurrentCyclicBarrier; 
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/ **
*同步辅助,允许一个或多个线程等待,直到
*在其他线程中执行的一组操作完成。
*
*< p> A {@code CountDownLatch}使用给定的< em>计数< / em>初始化。
* {@link #await await}方法阻塞,直到当前计数由于调用{@link #countDown}方法而达到
*零,此后
*所有等待的线程释放和任何后续调用
* {@link #await await}立即返回。这是一次性的现象
* - 计数不能重置。如果你需要一个版本来重置
*计数,请考虑使用{@link CyclicBarrier}。
*
*< p> A {@code CountDownLatch}是一个通用的同步工具
*,可用于多种用途。用$ 1初始化的
* {@code CountDownLatch}用作
*简单的开/关锁存器,或门:所有线程调用{@link #await await}
* wait在门口,直到它被调用{@link
* #countDown}的线程打开。已初始化为< em> N< / em>的{@code CountDownLatch}
*可用于使一个线程等待,直到< em> N< / em>线程有
*完成了一些动作,或者一些动作已经完成了N次。
*
*< p> {@code CountDownLatch}的一个有用的属性是它
*不需要调用{@code countDown}的线程等待
*计数在继续之前达到零,它阻止任何
*线程通过{@link #await await},直到所有
*线程可以通过。
*
*< p>< b>样品用量:< / b>这里是一对类,其中工作线程的组
*使用两个倒计数锁存器:
*< ul>
*< li>第一个是启动信号,防止任何工人进行
*,直到驾驶员准备好继续进行;
*< li>第二个是完成信号,允许驾驶员等待
*直到所有工人完成。
*< / ul>
*
*< pre>
* class Driver {// ...
* void main()throws InterruptedException {
* CountDownLatch startSignal = new CountDownLatch(1);
* CountDownLatch doneSignal = new CountDownLatch(N);
*
* for(int i = 0; i * new Thread(new Worker(startSignal,doneSignal))。start ();
*
* doSomethingElse(); //不让运行
* startSignal.countDown(); //让所有线程继续
* doSomethingElse();
* doneSignal.await(); //等待所有完成
*}
*}
*
*类Worker实现Runnable {
* private final CountDownLatch startSignal;
* private final CountDownLatch doneSignal;
* Worker(CountDownLatch startSignal,CountDownLatch doneSignal){
* this.startSignal = startSignal;
* this.doneSignal = doneSignal;
*}
* public void run(){
* try {
* startSignal.await();
* doWork();
* doneSignal.countDown();
*} catch(InterruptedException ex){} // return;
*}
*
* void doWork(){...}
*}
*
*< / pre>另一个典型的用法是将问题分成N个部分,
*用每个部分描述每个部分,Runnable执行那个部分,
*倒计数在锁存器上,并将所有Runnables排队到
* Executor。当所有子部件完成时,协调线程
*将能够通过等待。 (当线程必须重复
*以这种方式倒计时,而不是使用{@link CyclicBarrier}。)
*
*< pre>
* class Driver2 {// ...
* void main()throws InterruptedException {
* CountDownLatch doneSignal = new CountDownLatch(N);
* Executor e = ...
*
* for(int i = 0; i * e。 execute(new WorkerRunnable(doneSignal,i));
*
* doneSignal.await(); //等待所有完成
*}
*}
*
*类WorkerRunnable实现Runnable {
* private final CountDownLatch doneSignal;
* private final int i
* WorkerRunnable(CountDownLatch doneSignal,int i){
* this.doneSignal = doneSignal;
* this.i = i;
*}
* public void run(){
* try {
* doWork(i);
* doneSignal.countDown();
*} catch(InterruptedException ex){} // return;
*}
*
* void doWork(){...}
*}
*
*< / pre>
*
*< p>内存一致性效应:调用$ b之前线程中的操作$ b * {@code countDown()}
*< a href =package- summary.html#MemoryVisibility>< i> happens-before< / i>< / a>
*在另一个线程中从相应的
* {@code await()}成功返回后的操作。
*
* @since 1.5
* @author Doug Lea
* /
public class ResettableCountDownLatch {
/ **
*同步控制对于CountDownLatch。
*使用AQS状态来表示计数。
* /
私有静态最终类同步扩展AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

public final int startCount;

Sync(int count){
this.startCount = count;
setState(startCount);
}

int getCount(){
return getState();
}

public int tryAcquireShared(int acquire){
return getState()== 0? 1:-1;
}

public boolean tryReleaseShared(int releases){
//递减计数;当转换为零时信号
for(;;){
int c = getState();
if(c == 0)
return false;
int nextc = c-1;
if(compareAndSetState(c,nextc))
return nextc == 0;
}
}

public void reset(){
setState(startCount);
}
}

私人最终同步同步;

/ **
*构造一个用给定计数初始化的{@code CountDownLatch}。
*
* @param计数{@link #countDown}必须调用的次数
*在线程可以通过之前{@link #await}
* @throws IllegalArgumentException如果{@code count}为负
* /
public ResettableCountDownLatch(int count){
if(count< 0)throw new IllegalArgumentException(count< 0);
this.sync = new Sync(count);
}

/ **
*导致当前线程等待,直到锁存器计数到
*为零,除非线程是{@linkplain Thread#中断中断}。
*
*< p>如果当前计数为零,则此方法立即返回。
*
*< p>如果当前计数大于零,则当前
*线程对于线程调度目的被禁用,并且位于
* dormant,直到两个事件之一发生:
*< ul>
*< li>由于调用
* {@link #countDown}方法,计数到达零;或
*< li>一些其他线程{@linkplain线程#中断中断}
*当前线程。
*< / ul>
*
*< p>如果当前线程:
*< ul>
*< li>在进入该方法时设置其中断状态;或
*< li>是在等待期间{@linkplain线程#中断中断},
*< / ul>
*然后{@link InterruptedException}被抛出,当前线程的
*中断状态被清除。
*
* @throws InterruptedException如果当前线程被中断
*等待时
* /
public void await()throws InterruptedException {
sync。 acquireSharedInterruptably(1);
}

public void reset(){
sync.reset();
}

/ **
*导致当前线程等待,直到锁存器计数到
*为零,除非线程是{@linkplain Thread#中断中断},
*或指定的等待时间耗尽。
*
*< p>如果当前计数为零,则此方法立即返回
*,值为{@code true}。
*
*< p>如果当前计数大于零,则当前
*线程对于线程调度目的被禁用,并且位于
* dormant,直到三个事件发生:
*< ul>
*< li>由于调用
* {@link #countDown}方法,计数到达零;或
*< li>一些其他线程{@linkplain线程#中断中断}
*当前线程;或
*< li>指定的等待时间已过。
*< / ul>
*
*< p>如果计数达到零,则该方法返回
*值{@code true}。
*
*< p>如果当前线程:
*< ul>
*< li>在进入该方法时设置其中断状态;或
*< li>是在等待期间{@linkplain线程#中断中断},
*< / ul>
*然后{@link InterruptedException}被抛出,当前线程的
*中断状态被清除。
*
*< p>如果指定的等待时间过去,则返回值{@code false}
*。如果时间小于或等于零,方法
*将不会等待。
*
* @param timeout最大等待时间
* @param单位{@code timeout}参数的时间单位
* @return {@code true} if计数达到零,并且如果计数到达之前等待时间已过,则{@code false}
*
* @throws如果当前线程中断则抛出InterruptedException
*等待时
* /
public boolean await(long timeout,TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1,unit.toNanos(timeout));
}

/ **
*减少锁存器的计数,如果
*计数到零,释放所有等待的线程。
*
*< p>如果当前计数大于零,则递减。
*如果新计数为零,那么所有等待的线程将重新启用
*线程调度目的。
*
*< p>如果当前计数等于零,则什么也不发生。
* /
public void countDown(){
sync.releaseShared(1);
}

/ **
*返回当前计数。
*
*< p>此方法通常用于调试和测试目的。
*
* @返回当前计数
* /
public long getCount(){
return sync.getCount();
}

/ **
*返回标识此锁存器的字符串及其状态。
*括号中的状态包括字符串{@codeCount =}
*,后跟当前计数。
*
* @返回一个标识此锁存器及其状态的字符串
* /
public String toString(){
return super.toString() [Count =+ sync.getCount()+];
}
}


I need something which is directly equivalent to CountDownLatch, but is resettable (remaining thread-safe!). I can't use classic synchronisation constructs as they simply don't work in this situation (complex locking issues). At the moment, I'm creating many CountDownLatch objects, each replacing the previous one. I believe this is doing in the young generation in the GC (due to the sheer number of objects). You can see the code which uses the latches below (it's part of the java.net mock for a ns-3 network simulator interface).

Some ideas might be to try CyclicBarrier (JDK5+) or Phaser (JDK7)

I can test code and get back to anyone that finds a solution to this problem, since I'm the only one who can insert it into the running system to see what happens :)

/**
 *
 */
package kokunet;

import java.io.IOException;
import java.nio.channels.ClosedSelectorException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import kokuks.IConnectionSocket;
import kokuks.KKSAddress;
import kokuks.KKSSocket;
import kokuks.KKSSocketListener;

/**
 * KSelector
 * @version 1.0
 * @author Chris Dennett
 */
public class KSelector extends SelectorImpl {
    // True if this Selector has been closed
    private volatile boolean closed = false;

    // Lock for close and cleanup
    final class CloseLock {}
    private final Object closeLock = new CloseLock();

    private volatile boolean selecting = false;
    private volatile boolean wakeup = false;

    class SocketListener implements KKSSocketListener {
        protected volatile CountDownLatch latch = null;

        /**
         *
         */
        public SocketListener() {
            newLatch();
        }

        protected synchronized CountDownLatch newLatch() {
            return this.latch = new CountDownLatch(1);
        }

        protected synchronized void refreshReady(KKSSocket socket) {
            if (!selecting) return;

            synchronized (socketToChannel) {
                SelChImpl ch = socketToChannel.get(socket);
                if (ch == null) {
                    System.out.println("ks sendCB: channel not found for socket: " + socket);
                    return;
                }
                synchronized (channelToKey) {
                    SelectionKeyImpl sk = channelToKey.get(ch);
                    if (sk != null) {
                        if (handleSelect(sk)) {
                            latch.countDown();
                        }
                    }
                }
            }
        }
        @Override
        public void connectionSucceeded(KKSSocket socket) {
            refreshReady(socket);
        }
        @Override
        public void connectionFailed(KKSSocket socket) {
            refreshReady(socket);
        }
        @Override
        public void dataSent(KKSSocket socket, long bytesSent) {
            refreshReady(socket);
        }
        @Override
        public void sendCB(KKSSocket socket, long bytesAvailable) {
            refreshReady(socket);
        }
        @Override
        public void onRecv(KKSSocket socket) {
            refreshReady(socket);
        }
        @Override
        public void newConnectionCreated(KKSSocket socket, KKSSocket newSocket, KKSAddress remoteaddress) {
            refreshReady(socket);
        }
        @Override
        public void normalClose(KKSSocket socket) {
            wakeup();
        }
        @Override
        public void errorClose(KKSSocket socket) {
            wakeup();
        }
    }

    protected final Map<KKSSocket, SelChImpl>        socketToChannel = new HashMap<KKSSocket, SelChImpl>();
    protected final Map<SelChImpl, SelectionKeyImpl> channelToKey    = new HashMap<SelChImpl, SelectionKeyImpl>();
    protected final SocketListener currListener = new SocketListener();
    protected Thread selectingThread = null;

    SelChImpl getChannelForSocket(KKSSocket s) {
        synchronized (socketToChannel) {
            return socketToChannel.get(s);
        }
    }

    SelectionKeyImpl getSelKeyForChannel(KKSSocket s) {
        synchronized (channelToKey) {
            return channelToKey.get(s);
        }
    }

    protected boolean markRead(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_READ);
            return selectedKeys.add(impl);
        }
    }

    protected boolean markWrite(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_WRITE);
            return selectedKeys.add(impl);
        }
    }

    protected boolean markAccept(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_ACCEPT);
            return selectedKeys.add(impl);
        }
    }

    protected boolean markConnect(SelectionKeyImpl impl) {
        synchronized (impl) {
            if (!impl.isValid()) return false;
            impl.nioReadyOps(impl.readyOps() | SelectionKeyImpl.OP_CONNECT);
            return selectedKeys.add(impl);
        }
    }

    /**
     * @param provider
     */
    protected KSelector(SelectorProvider provider) {
        super(provider);
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implClose()
     */
    @Override
    protected void implClose() throws IOException {
        provider().getApp().printMessage("implClose: closed: " + closed);
        synchronized (closeLock) {
            if (closed) return;
            closed = true;
            for (SelectionKey sk : keys) {
                provider().getApp().printMessage("dereg1");
                deregister((AbstractSelectionKey)sk);
                provider().getApp().printMessage("dereg2");
                SelectableChannel selch = sk.channel();
                if (!selch.isOpen() && !selch.isRegistered())
                    ((SelChImpl)selch).kill();
            }
            implCloseInterrupt();
        }
    }

    protected void implCloseInterrupt() {
        wakeup();
    }

    private boolean handleSelect(SelectionKey k) {
        synchronized (k) {
            boolean notify = false;

            if (!k.isValid()) {
                k.cancel();
                ((SelectionKeyImpl)k).channel.socket().removeListener(currListener);
                return false;
            }

            SelectionKeyImpl ski = (SelectionKeyImpl)k;

            if ((ski.interestOps() & SelectionKeyImpl.OP_READ) != 0) {
                if (ski.channel.socket().getRxAvailable() > 0) {
                    notify |= markRead(ski);
                }
            }

            if ((ski.interestOps() & SelectionKeyImpl.OP_WRITE) != 0) {
                if (ski.channel.socket().getTxAvailable() > 0) {
                    notify |= markWrite(ski);
                }
            }

            if ((ski.interestOps() & SelectionKeyImpl.OP_CONNECT) != 0) {
                if (!ski.channel.socket().isConnectionless()) {
                    IConnectionSocket cs = (IConnectionSocket)ski.channel.socket();
                    if (!ski.channel.socket().isAccepting() && !cs.isConnecting() && !cs.isConnected()) {
                        notify |= markConnect(ski);
                    }
                }
            }

            if ((ski.interestOps() & SelectionKeyImpl.OP_ACCEPT) != 0) {
                //provider().getApp().printMessage("accept check: ski: " + ski + ", connectionless: " + ski.channel.socket().isConnectionless() + ", listening: " + ski.channel.socket().isListening() + ", hasPendingConn: " + (ski.channel.socket().isConnectionless() ? "nope!" : ((IConnectionSocket)ski.channel.socket()).hasPendingConnections()));
                if (!ski.channel.socket().isConnectionless() && ski.channel.socket().isListening()) {
                    IConnectionSocket cs = (IConnectionSocket)ski.channel.socket();
                    if (cs.hasPendingConnections()) {
                        notify |= markAccept(ski);
                    }
                }
            }
            return notify;
        }
    }

    private boolean handleSelect() {
        boolean notify = false;

        // get initial status
        for (SelectionKey k : keys) {
            notify |= handleSelect(k);
        }

        return notify;
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#doSelect(long)
     */
    @Override
    protected int doSelect(long timeout) throws IOException {
        processDeregisterQueue();

        long timestartedms = System.currentTimeMillis();

        synchronized (selectedKeys) {
            synchronized (currListener) {
                wakeup = false;
                selectingThread = Thread.currentThread();
                selecting = true;
            }
            try {
                handleSelect();

                if (!selectedKeys.isEmpty() || timeout == 0) {
                    return selectedKeys.size();
                }

                //TODO: useless op if we have keys available
                for (SelectionKey key : keys) {
                    ((SelectionKeyImpl)key).channel.socket().addListener(currListener);
                }
                try {
                    while (!wakeup && isOpen() && selectedKeys.isEmpty()) {
                        CountDownLatch latch = null;
                        synchronized (currListener) {
                            if (wakeup || !isOpen() || !selectedKeys.isEmpty()) {
                                break;
                            }
                            latch = currListener.newLatch();
                        }
                        try {
                            if (timeout > 0) {
                                long currtimems = System.currentTimeMillis();
                                long remainingMS = (timestartedms + timeout) - currtimems;

                                if (remainingMS > 0) {
                                    latch.await(remainingMS, TimeUnit.MILLISECONDS);
                                } else {
                                    break;
                                }
                            } else {
                                latch.await();
                            }
                        } catch (InterruptedException e) {

                        }
                    }
                    return selectedKeys.size();
                } finally {
                    for (SelectionKey key : keys) {
                        ((SelectionKeyImpl)key).channel.socket().removeListener(currListener);
                    }
                }
            } finally {
                synchronized (currListener) {
                    selecting = false;
                    selectingThread = null;
                    wakeup = false;
                }
            }
        }
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implRegister(kokunet.SelectionKeyImpl)
     */
    @Override
    protected void implRegister(SelectionKeyImpl ski) {
        synchronized (closeLock) {
            if (closed) throw new ClosedSelectorException();
            synchronized (channelToKey) {
                synchronized (socketToChannel) {
                    keys.add(ski);
                    socketToChannel.put(ski.channel.socket(), ski.channel);
                    channelToKey.put(ski.channel, ski);
                }
            }
        }

    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#implDereg(kokunet.SelectionKeyImpl)
     */
    @Override
    protected void implDereg(SelectionKeyImpl ski) throws IOException {
        synchronized (channelToKey) {
            synchronized (socketToChannel) {
                keys.remove(ski);
                socketToChannel.remove(ski.channel.socket());
                channelToKey.remove(ski.channel);

                SelectableChannel selch = ski.channel();

                if (!selch.isOpen() && !selch.isRegistered())
                    ((SelChImpl)selch).kill();
            }
        }
    }

    /* (non-Javadoc)
     * @see kokunet.SelectorImpl#wakeup()
     */
    @Override
    public Selector wakeup() {
        synchronized (currListener) {
            if (selecting) {
                wakeup = true;
                selecting = false;
                selectingThread.interrupt();
                selectingThread = null;
            }
        }
        return this;
    }
}

Cheers,
Chris

解决方案

I copied CountDownLatch and implemented a reset() method that resets the internal Sync class to its initial state (starting count) :) Appears to work fine. No more unnecessary object creation \o/ It was not possible to subclass because sync was private. Boo.

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

/**
 * A synchronization aid that allows one or more threads to wait until
 * a set of operations being performed in other threads completes.
 *
 * <p>A {@code CountDownLatch} is initialized with a given <em>count</em>.
 * The {@link #await await} methods block until the current count reaches
 * zero due to invocations of the {@link #countDown} method, after which
 * all waiting threads are released and any subsequent invocations of
 * {@link #await await} return immediately.  This is a one-shot phenomenon
 * -- the count cannot be reset.  If you need a version that resets the
 * count, consider using a {@link CyclicBarrier}.
 *
 * <p>A {@code CountDownLatch} is a versatile synchronization tool
 * and can be used for a number of purposes.  A
 * {@code CountDownLatch} initialized with a count of one serves as a
 * simple on/off latch, or gate: all threads invoking {@link #await await}
 * wait at the gate until it is opened by a thread invoking {@link
 * #countDown}.  A {@code CountDownLatch} initialized to <em>N</em>
 * can be used to make one thread wait until <em>N</em> threads have
 * completed some action, or some action has been completed N times.
 *
 * <p>A useful property of a {@code CountDownLatch} is that it
 * doesn't require that threads calling {@code countDown} wait for
 * the count to reach zero before proceeding, it simply prevents any
 * thread from proceeding past an {@link #await await} until all
 * threads could pass.
 *
 * <p><b>Sample usage:</b> Here is a pair of classes in which a group
 * of worker threads use two countdown latches:
 * <ul>
 * <li>The first is a start signal that prevents any worker from proceeding
 * until the driver is ready for them to proceed;
 * <li>The second is a completion signal that allows the driver to wait
 * until all workers have completed.
 * </ul>
 *
 * <pre>
 * class Driver { // ...
 *   void main() throws InterruptedException {
 *     CountDownLatch startSignal = new CountDownLatch(1);
 *     CountDownLatch doneSignal = new CountDownLatch(N);
 *
 *     for (int i = 0; i < N; ++i) // create and start threads
 *       new Thread(new Worker(startSignal, doneSignal)).start();
 *
 *     doSomethingElse();            // don't let run yet
 *     startSignal.countDown();      // let all threads proceed
 *     doSomethingElse();
 *     doneSignal.await();           // wait for all to finish
 *   }
 * }
 *
 * class Worker implements Runnable {
 *   private final CountDownLatch startSignal;
 *   private final CountDownLatch doneSignal;
 *   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
 *      this.startSignal = startSignal;
 *      this.doneSignal = doneSignal;
 *   }
 *   public void run() {
 *      try {
 *        startSignal.await();
 *        doWork();
 *        doneSignal.countDown();
 *      } catch (InterruptedException ex) {} // return;
 *   }
 *
 *   void doWork() { ... }
 * }
 *
 * </pre>
 *
 * <p>Another typical usage would be to divide a problem into N parts,
 * describe each part with a Runnable that executes that portion and
 * counts down on the latch, and queue all the Runnables to an
 * Executor.  When all sub-parts are complete, the coordinating thread
 * will be able to pass through await. (When threads must repeatedly
 * count down in this way, instead use a {@link CyclicBarrier}.)
 *
 * <pre>
 * class Driver2 { // ...
 *   void main() throws InterruptedException {
 *     CountDownLatch doneSignal = new CountDownLatch(N);
 *     Executor e = ...
 *
 *     for (int i = 0; i < N; ++i) // create and start threads
 *       e.execute(new WorkerRunnable(doneSignal, i));
 *
 *     doneSignal.await();           // wait for all to finish
 *   }
 * }
 *
 * class WorkerRunnable implements Runnable {
 *   private final CountDownLatch doneSignal;
 *   private final int i;
 *   WorkerRunnable(CountDownLatch doneSignal, int i) {
 *      this.doneSignal = doneSignal;
 *      this.i = i;
 *   }
 *   public void run() {
 *      try {
 *        doWork(i);
 *        doneSignal.countDown();
 *      } catch (InterruptedException ex) {} // return;
 *   }
 *
 *   void doWork() { ... }
 * }
 *
 * </pre>
 *
 * <p>Memory consistency effects: Actions in a thread prior to calling
 * {@code countDown()}
 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
 * actions following a successful return from a corresponding
 * {@code await()} in another thread.
 *
 * @since 1.5
 * @author Doug Lea
 */
public class ResettableCountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        public final int startCount;

        Sync(int count) {
            this.startCount = count;
            setState(startCount);
        }

        int getCount() {
            return getState();
        }

        public int tryAcquireShared(int acquires) {
            return getState() == 0? 1 : -1;
        }

        public boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

        public void reset() {
             setState(startCount);
        }
    }

    private final Sync sync;

    /**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public ResettableCountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
     *
     * <p>If the current count is zero then this method returns immediately.
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of two things happen:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread.
     * </ul>
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is interrupted
     *         while waiting
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public void reset() {
        sync.reset();
    }

    /**
     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted},
     * or the specified waiting time elapses.
     *
     * <p>If the current count is zero then this method returns immediately
     * with the value {@code true}.
     *
     * <p>If the current count is greater than zero then the current
     * thread becomes disabled for thread scheduling purposes and lies
     * dormant until one of three things happen:
     * <ul>
     * <li>The count reaches zero due to invocations of the
     * {@link #countDown} method; or
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     * <li>The specified waiting time elapses.
     * </ul>
     *
     * <p>If the count reaches zero then the method returns with the
     * value {@code true}.
     *
     * <p>If the current thread:
     * <ul>
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * <p>If the specified waiting time elapses then the value {@code false}
     * is returned.  If the time is less than or equal to zero, the method
     * will not wait at all.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the {@code timeout} argument
     * @return {@code true} if the count reached zero and {@code false}
     *         if the waiting time elapsed before the count reached zero
     * @throws InterruptedException if the current thread is interrupted
     *         while waiting
     */
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**
     * Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.
     *
     * <p>If the current count is greater than zero then it is decremented.
     * If the new count is zero then all waiting threads are re-enabled for
     * thread scheduling purposes.
     *
     * <p>If the current count equals zero then nothing happens.
     */
    public void countDown() {
        sync.releaseShared(1);
    }

    /**
     * Returns the current count.
     *
     * <p>This method is typically used for debugging and testing purposes.
     *
     * @return the current count
     */
    public long getCount() {
        return sync.getCount();
    }

    /**
     * Returns a string identifying this latch, as well as its state.
     * The state, in brackets, includes the String {@code "Count ="}
     * followed by the current count.
     *
     * @return a string identifying this latch, as well as its state
     */
    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}

这篇关于可重置CountdownLatch的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆