了解Jetty的“待处理/未准备时关闭"警告 [英] Understanding Jetty's "Closed while Pending/Unready" warning

查看:183
本文介绍了了解Jetty的“待处理/未准备时关闭"警告的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们有一个异步servlet,它从Jetty生成以下警告日志:

We have an asynchronous servlet which produces the following warning log from Jetty:

java.io.IOException: Closed while Pending/Unready

启用调试日志后,我得到以下堆栈跟踪:

After enabling debug logs I got the following stacktrace:

WARN [jetty-25948] (HttpOutput.java:278)   - java.io.IOException: Closed while Pending/Unready
DEBUG [jetty-25948] (HttpOutput.java:279)   - 
java.io.IOException: Closed while Pending/Unready
        at org.eclipse.jetty.server.HttpOutput.close(HttpOutput.java:277) ~[jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.Response.closeOutput(Response.java:1044) [jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:488) [jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpChannel.run(HttpChannel.java:293) [jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:708) [jetty-util.jar:9.4.8.v20171121]
        at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:626) [jetty-util.jar:9.4.8.v20171121]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]

它并没有太大帮助.

仅在Jetty调用我们的AsyncListeneronTimeout()方法之后,才发出警告. QA有时可以通过在客户端应用程序上使用kill -9来重现它.

The warning comes only after Jetty calls the onTimeout() method of our AsyncListener. QA sometimes could reproduce it by using kill -9 on the client side application.

如何使用示例servlet-客户端代码重现此警告?我想在比我们的生产代码更简单的环境中理解此问题,以便以后能够修复生产.样本servlet应该如何表现?是否可以在同一JUnit测试中使用Apache Commons HttpClient客户端来重现它? (这对于编写集成测试而没有像kill -9这样的复杂网络黑客来说非常有用.)

How can I reproduce this warning with a sample servlet-client code? I would like to understand this issue in a simpler environment than our production code to be able to fix the production one afterwards. How should a sample servlet behave? Is it possible to reproduce that with an Apache Commons HttpClient client side in the same JUnit test? (That would be great for writing an integration test without complicated network hacking, like kill -9.)

我已经尝试了一些方法来实现示例异步servlet和客户端,但均未成功.我认为附加此代码不会有太大帮助,但是如果有兴趣的人可以这样做.

I have tried a few things to implement a sample async servlet and client without success. I don't think that attaching this code would help too much but I can do that if anyone interested.

码头版本:9.4.8.v20171121

Jetty version: 9.4.8.v20171121

更新(2018-06-27):

反映到@Joakim Erdfelt的有用答案,我没有在我们的代码中找到任何close()调用,但是发现可疑的同步丢失.这是异步轮询servlet的基础:

Reflecting to @Joakim Erdfelt's helpful answer, I have not found any close() call in our code, but found a suspicious missing synchronization. Here is the base of our async poll servlet:

public class QueuePollServlet extends HttpServlet {

    public QueuePollServlet() {
    }

    @Override
    protected void doPost(final HttpServletRequest req, final HttpServletResponse resp)
            throws ServletException, IOException {
        resp.setContentType(MediaType.OCTET_STREAM.type());
        resp.setStatus(HttpServletResponse.SC_OK);
        resp.flushBuffer();
        final AsyncContext async = req.startAsync();
        async.setTimeout(30_000);
        final ServletOutputStream output = resp.getOutputStream();
        final QueueWriteListener writeListener = new QueueWriteListener(async, output);
        async.addListener(writeListener);
        output.setWriteListener(writeListener);
    }

    private static class QueueWriteListener implements AsyncListener, WriteListener {

        private final AsyncContext asyncContext;
        private final ServletOutputStream output;

        public QueueWriteListener(final AsyncContext asyncContext, final ServletOutputStream output) {
            this.asyncContext = checkNotNull(asyncContext, "asyncContext cannot be null");
            this.output = checkNotNull(output, "output cannot be null");
        }

        @Override
        public void onWritePossible() throws IOException {
            writeImpl();
        }

        private synchronized void writeImpl() throws IOException {
            while (output.isReady()) {
                final byte[] message = getNextMessage();
                if (message == null) {
                    output.flush();
                    return;
                }
                output.write(message);
            }
        }

        private void completeImpl() {
            asyncContext.complete();
        }

        public void dataArrived() {
            try {
                writeImpl();
            } catch (IOException e) {
                ...
            }
        }

        public void noMoreBuffers() {
            completeImpl();
        }

        @Override
        public void onTimeout(final AsyncEvent event) throws IOException {
            completeImpl();
        }

        @Override
        public void onError(final Throwable t) {
            logger.error("Writer.onError", t);
            completeImpl();
        }


        ...
    }
}

可能的比赛条件:

  1. DataFeederThread:调用dataArrived()-> writeImpl(),然后得到output.isReady()true.
  2. Jetty调用onTimeout()完成上下文.
  3. DataFeederThread:在while循环中调用output.write(),但找到了完整的上下文.
  1. DataFeederThread: calls dataArrived() -> writeImpl(), then it gets that output.isReady() is true.
  2. Jetty calls onTimeout() which completes the context.
  3. DataFeederThread: calls output.write() in the while loop but found the completed context.

这种情况可能引起Closed while Pending/Unready警告,还是另一个问题? 我认为制作completeImpl() synchronized可以解决问题,还是还有其他需要注意的地方?

Could this scenario cause the Closed while Pending/Unready warning or is it another issue? I am right that making completeImpl() synchronized solves the problem or there is something else to care about?

更新(2018-06-28):

我们在QueueWriteListener中也有类似的onError实现,如下所示:

We also have a similar onError implementation in QueueWriteListener as the following snippet:

@Override
public void onError(final Throwable t) {
    logger.error("Writer.onError", t);
    completeImpl();
}

无论如何,在Closed while Pending/Unready日志消息周围没有任何onError错误日志(每个日志都在两个小时的时间范围内),就像我们的DataFeederThread中的以下EOF一样:

Anyway, there is no onError error log around the Closed while Pending/Unready log message (looking at a two hour timeframe for each), just EOFs like the following ones from our DataFeederThread:

DEBUG [DataFeederThread] (HttpOutput.java:224) - 
org.eclipse.jetty.io.EofException: null
        at org.eclipse.jetty.server.HttpConnection$SendCallback.reset(HttpConnection.java:704) ~[jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpConnection$SendCallback.access$300(HttpConnection.java:668) ~[jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpConnection.send(HttpConnection.java:526) ~[jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpChannel.sendResponse(HttpChannel.java:778) ~[jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpChannel.write(HttpChannel.java:834) ~[jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:234) [jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:218) [jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpOutput.flush(HttpOutput.java:392) [jetty-server.jar:9.4.8.v20171121]
        at com.example.QueuePollServlet$QueueWriteListener.writeImpl()
        at com.example.QueuePollServlet$QueueWriteListener.dataArrived()


DEBUG [DataFeederThread] (QueuePollServlet.java:217) - messageArrived exception
org.eclipse.jetty.io.EofException: Closed
        at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:476) ~[jetty-server.jar:9.4.8.v20171121]
        at com.example.QueuePollServlet$QueueWriteListener.writeImpl()
        at com.example.QueuePollServlet$QueueWriteListener.dataArrived()

推荐答案

可以通过一些手动调试和普通的curl客户端来重现Closed while Pending/Unready警告.我已经用Jetty 9.4.8.v20171121和Jetty 9.4.11.v20180605测试了它.您需要两个断点,因此要在测试环境中可靠地重现并不容易.

It is possible to reproduce the Closed while Pending/Unready warning with some manual debugging and a plain curl client. I have tested it with Jetty 9.4.8.v20171121 and Jetty 9.4.11.v20180605. You need two breakpoints, so it's not easy to reliably reproduce in a test environment.

  1. The first breakpoint is in the HttpOutput.write() method right after it changes its state from READY to PENDING:

case READY:
    if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
        continue;
    // put a breakpoint here

  • The second one in Response.closeOutput():

    case STREAM:
        // put a breakpiont here
        if (!_out.isClosed())
            getOutputStream().close();
        break;
    

  • 复制步骤:

    1. [JettyThread1]调用QueueWriteListener.onWritePossible(),后者将几个字节写入输出,然后返回(因为其输入缓冲区为空).
    2. 等待onTimeout事件.
    3. [JettyThread2] HttpChannelState.onTimeout()调用QueueWriteListener.onTimeout(),后者调用asyncContext.complete().
    4. [JettyThread2] HttpChannelState.onTimeout()在异步超时后安排调度.
    5. [JettyThread2]在第二个断点处暂停
    6. [DFT] DataFeederThread调用writeImpl()
    7. [DFT] DataFeederThread调用HttpOutput.write()(这是输出流)
    8. [DFT] HttpOutput.write()将其状态从READY更改为PENDING
    9. [DFT] DataFeederThread由于上面的断点而在此处暂停
    10. [JettyThread2]调度的调度将关闭输出流并产生Closed while Pending/Unready警告.
    1. [JettyThread1] Calls QueueWriteListener.onWritePossible() which writes a few bytes to the output then returns (as its input buffer is empty).
    2. Wait for the onTimeout event.
    3. [JettyThread2] HttpChannelState.onTimeout() calls QueueWriteListener.onTimeout() which calls asyncContext.complete().
    4. [JettyThread2] HttpChannelState.onTimeout() schedules a dispatch after the async timeout.
    5. [JettyThread2] Pauses at the second breakpoint
    6. [DFT] DataFeederThread calls writeImpl()
    7. [DFT] DataFeederThread calls HttpOutput.write() (it's the output stream)
    8. [DFT] HttpOutput.write() changes its state from READY to PENDING
    9. [DFT] DataFeederThread pauses here, due to the breakpoint above
    10. [JettyThread2] The scheduled dispatch closes the output stream and produces the Closed while Pending/Unready warning.

    因此,实际上是Jetty关闭了此(Jetty 9.4.8.v20171121)堆栈上的输出流:

    So, actually it's Jetty who closes the output stream on this (Jetty 9.4.8.v20171121) stack:

    Thread [jetty-19] (Suspended)   
        Response.closeOutput() line: 1043   
        HttpChannelOverHttp(HttpChannel).handle() line: 488 
        HttpChannelOverHttp(HttpChannel).run() line: 293    
        QueuedThreadPool.runJob(Runnable) line: 708 
        QueuedThreadPool$2.run() line: 626  
        Thread.run() line: 748  
    

    在侦听器中进行onTimeout() synchronized(以及writeImpl()也是synchronized)无济于事,因为计划的关闭仍然可以与writeImpl(来自DataFeederThread)重叠.考虑这种情况:

    Making onTimeout() synchronized (as well as writeImpl() is also synchronized) in the listener does not help since the scheduled closing still be able to overlap with writeImpl (from DataFeederThread). Consider this case:

    1. [JettyThread1]调用QueueWriteListener.onWritePossible(),后者将几个字节写入输出,然后返回(因为其输入缓冲区为空).
    2. 等待onTimeout事件.
    3. [JettyThread2] HttpChannelState.onTimeout()调用QueueWriteListener.onTimeout(),后者调用asyncContext.complete().
    4. [DFT] DataFeederThread调用writeImpl()(由于onTimeout尚未完成而被阻止)
    5. [JettyThread2] QueueWriteListener.onTimeout()完成
    6. [DFT] writeImpl()可以运行
    7. [JettyThread2] HttpChannelState.onTimeout()在异步超时后安排调度.
    8. [JettyThread2]在第二个断点处暂停
    9. [DFT] DataFeederThread调用HttpOutput.write()(这是输出流)
    10. [DFT] HttpOutput.write()将其状态从READY更改为PENDING
    11. [DFT] DataFeederThread由于上面的断点而在此处暂停
    12. [JettyThread2]调度的调度将关闭输出流并产生Closed while Pending/Unready警告.
    1. [JettyThread1] Calls QueueWriteListener.onWritePossible() which writes a few bytes to the output then returns (as its input buffer is empty).
    2. Wait for the onTimeout event.
    3. [JettyThread2] HttpChannelState.onTimeout() calls QueueWriteListener.onTimeout() which calls asyncContext.complete().
    4. [DFT] DataFeederThread calls writeImpl() (it's blocked since onTimeout has not finished yet)
    5. [JettyThread2] QueueWriteListener.onTimeout() finishes
    6. [DFT] writeImpl() can run
    7. [JettyThread2] HttpChannelState.onTimeout() schedules a dispatch after the async timeout.
    8. [JettyThread2] Pauses at the second breakpoint
    9. [DFT] DataFeederThread calls HttpOutput.write() (it's the output stream)
    10. [DFT] HttpOutput.write() changes its state from READY to PENDING
    11. [DFT] DataFeederThread pauses here, due to the breakpoint above
    12. [JettyThread2] The scheduled dispatch closes the output stream and produces the Closed while Pending/Unready warning.

    不幸的是,在asnyContext.complete()之后,仅检查output.isReady()是不够的.由于Jetty重新打开HttpOutput(请参见下面的堆栈),它会返回true,因此在侦听器中需要单独的标志.

    Unfortunately, after asnyContext.complete() it is not enough to check output.isReady(). It returns true since Jetty reopens the HttpOutput (see the stack below), so you need a separate flag for that in the listener.

    Thread [jetty-13] (Suspended (access of field _state in HttpOutput))    
        HttpOutput.reopen() line: 195   
        HttpOutput.recycle() line: 923  
        Response.recycle() line: 138    
        HttpChannelOverHttp(HttpChannel).recycle() line: 269    
        HttpChannelOverHttp.recycle() line: 83  
        HttpConnection.onCompleted() line: 424  
        HttpChannelOverHttp(HttpChannel).onCompleted() line: 695    
        HttpChannelOverHttp(HttpChannel).handle() line: 493 
        HttpChannelOverHttp(HttpChannel).run() line: 293    
        QueuedThreadPool.runJob(Runnable) line: 708 
        QueuedThreadPool$2.run() line: 626  
        Thread.run() line: 748  
    

    此外,当输出仍处于关闭状态(循环/重新打开之前)时,isReady()也会返回true.相关问题: isReady()在关闭状态下返回true-为什么?

    Furthermore, isReady() also returns true when the output is still closed (before recycle/reopen). Related question: isReady() returns true in closed state - why?

    最终实现是类似的:

    @Override
    protected void doPost(final HttpServletRequest req, final HttpServletResponse resp)
            throws ServletException, IOException {
        resp.setContentType(MediaType.OCTET_STREAM.type());
        resp.setStatus(HttpServletResponse.SC_OK);
        resp.setBufferSize(4096);
        resp.flushBuffer();
        final AsyncContext async = req.startAsync();
        async.setTimeout(5_000); // millis
        final ServletOutputStream output = resp.getOutputStream();
        final QueueWriteListener writeListener = new QueueWriteListener(async, output);
        async.addListener(writeListener);
        output.setWriteListener(writeListener);
    }
    
    private static class QueueWriteListener implements AsyncListener, WriteListener {
    
        private static final Logger logger = LoggerFactory.getLogger(QueueWriteListener.class);
    
        private final AsyncContext asyncContext;
        private final ServletOutputStream output;
    
        @GuardedBy("this")
        private boolean completed = false;
    
        public QueueWriteListener(final AsyncContext asyncContext, final ServletOutputStream output) {
            this.asyncContext = checkNotNull(asyncContext, "asyncContext cannot be null");
            this.output = checkNotNull(output, "output cannot be null");
        }
    
        @Override
        public void onWritePossible() throws IOException {
            writeImpl();
        }
    
        private synchronized void writeImpl() throws IOException {
            if (completed) {
                return;
            }
            while (output.isReady()) {
                final byte[] message = getNextMessage();
                if (message == null) {
                    output.flush();
                    return;
                }
                output.write(message);
            }
        }
    
        private synchronized void completeImpl() {
            // also stops DataFeederThread to call bufferArrived
            completed = true;
            asyncContext.complete();
        }
    
        @Override
        public void onError(final Throwable t) {
            logger.error("Writer.onError", t);
            completeImpl();
        }
    
        public void dataArrived() {
            try {
                writeImpl();
            } catch (RuntimeException | IOException e) {
                ...
            }
        }
    
        public void noMoreData() {
            completeImpl();
        }
    
        @Override
        public synchronized void onComplete(final AsyncEvent event) throws IOException {
            completed = true; // might not needed but does not hurt
        }
    
        @Override
        public synchronized void onTimeout(final AsyncEvent event) throws IOException {
            completeImpl();
        }
    
        @Override
        public void onError(final AsyncEvent event) throws IOException {
            logger.error("onError", event.getThrowable());
        }
    
        ...
    }
    

    更新2018-08-01 :实际上,它不能完全解决警告,请参阅:已关闭而Jetty的待处理/未就绪"警告

    update 2018-08-01: Actually it did not fix the warning completely, see: "Closed while Pending/Unready" warnings from Jetty

    这篇关于了解Jetty的“待处理/未准备时关闭"警告的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆