Servlet-3异步上下文,如何进行异步写入? [英] Servlet-3 Async Context, how to do asynchronous writes?

查看:85
本文介绍了Servlet-3异步上下文,如何进行异步写入?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Servlet-3.0 API允许分离请求/响应上下文并在以后回复。

Servlet-3.0 API allows to detach a request/response context and answer to it later.

但是,如果我尝试编写大量数据,例如:

However if I try to write a big amount of data, something like:

AsyncContext ac = getWaitingContext() ;
ServletOutputStream out = ac.getResponse().getOutputStream();
out.print(some_big_data);
out.flush()

它实际上可能会被阻止 - 并且它确实阻止了琐碎的测试案例 - 对于Tomcat 7和Jetty 8.教程建议创建一个线程池,以便
处理这样的设置 - 这通常是对传统10K架构的反作用。

It may actually block - and it does block in trivial test cases - for both Tomcat 7 and Jetty 8. The tutorials recommend to create a thread pool that would handle such a setup - witch is generally the counter-positive to a traditional 10K architecture.

但是,如果我有10,000个打开的连接和一个让我们说10个线程的线程池,
就足以让1%的低速连接客户端或仅阻止
连接阻止线程池并完全阻止彗星响应或
显着减慢它。

However if I have 10,000 open connections and a thread pool of let's say 10 threads, it is enough for even 1% of clients that have low speed connections or just blocked connection to block the thread pool and completely block the comet response or slow it down significantly.

预期的做法是获得写就绪通知或我/ O完成通知
并继续推送数据。

The expected practice is to get "write-ready" notification or I/O completion notification and than continue to push the data.

如何使用Servlet-3.0 API完成此操作,即如何获得:

How can this be done using Servlet-3.0 API, i.e. how do I get either:


  • I / O操作的异步完成通知。

  • 通过写入获取非阻塞I / O准备好的通知ication。

如果Servlet-3.0 API不支持,那么是否有任何特定于Web Server的API(如Jetty Continuation或Tomcat) CometEvent)允许真正异步处理此类事件而不使用线程池伪造异步I / O.

If this is not supported by the Servlet-3.0 API, are there any Web Server specific APIs (like Jetty Continuation or Tomcat CometEvent) that allow to handle such events truly asynchronously without faking asynchronous I/O using thread pool.

有人知道吗?

如果这不可能,您可以参考文档确认吗?

And if this is not possible can you confirm it with a reference to documentation?

我附上了模拟事件流的代码。

I had attached the code below that emulates event-stream.

注意:


  • 它使用 ServletOutputStream 抛出 IOException 来检测断开的客户端

  • 它发送 keep-alive 消息以确保客户端仍在那里

  • 我创建了一个线程池来模拟异步操作。

  • it uses ServletOutputStream that throws IOException to detect disconnected clients
  • it sends keep-alive messages to make sure clients are still there
  • I created a thread pool to "emulate" asynchronous operations.

在这样的例子中,我明确定义了大小为1的线程池来显示问题:

In such an example I explicitly defined thread pool of size 1 to show the problem:


  • 启动应用程序

  • 从两个终端运行 curl http:// localhost:8080 / path / to / app (两次)

  • 现在用发送数据curd -dm = message http:// localhost:8080 / path / to / app

  • 两个客户都收到了数据

  • 现在暂停其中一个客户端(Ctrl + Z并再次发送消息 curd -dm = message http:// localhost:8080 / path / to / app

  • 观察另一个非暂停客户端要么没有收到任何信息,要么在邮件被转移后停止接收保持活动请求,因为其他线程被阻止。

  • Start an application
  • Run from two terminals curl http://localhost:8080/path/to/app (twice)
  • Now send the data with curd -d m=message http://localhost:8080/path/to/app
  • Both clients received the data
  • Now suspend one of the clients (Ctrl+Z) and send the message once again curd -d m=message http://localhost:8080/path/to/app
  • Observe that another non-suspended client either received nothing or after the message was transfered stopped receiving keep-alive requests because other thread is blocked.

我想在不使用线程池的情况下解决这样的问题,因为使用1000-5000打开
连接我可以非常快地耗尽线程池。

I want to solve such a problem without using thread pool, because with 1000-5000 open connections I can exhaust the thread pool very fast.

示例代码下面。

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;

import javax.servlet.AsyncContext;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.ServletOutputStream;


@WebServlet(urlPatterns = "", asyncSupported = true)
public class HugeStreamWithThreads extends HttpServlet {

    private long id = 0;
    private String message = "";
    private final ThreadPoolExecutor pool = 
        new ThreadPoolExecutor(1, 1, 50000L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
        // it is explicitly small for demonstration purpose

    private final Thread timer = new Thread(new Runnable() {
        public void run()
        {
            try {
                while(true) {
                    Thread.sleep(1000);
                    sendKeepAlive();
                }
            }
            catch(InterruptedException e) {
                // exit
            }
        }
    });


    class RunJob implements Runnable {
        volatile long lastUpdate = System.nanoTime();
        long id = 0;
        AsyncContext ac;
        RunJob(AsyncContext ac) 
        {
            this.ac = ac;
        }
        public void keepAlive()
        {
            if(System.nanoTime() - lastUpdate > 1000000000L)
                pool.submit(this);
        }
        String formatMessage(String msg)
        {
            StringBuilder sb = new StringBuilder();
            sb.append("id");
            sb.append(id);
            for(int i=0;i<100000;i++) {
                sb.append("data:");
                sb.append(msg);
                sb.append("\n");
            }
            sb.append("\n");
            return sb.toString();
        }
        public void run()
        {
            String message = null;
            synchronized(HugeStreamWithThreads.this) {
                if(this.id != HugeStreamWithThreads.this.id) {
                    this.id = HugeStreamWithThreads.this.id;
                    message = HugeStreamWithThreads.this.message;
                }
            }
            if(message == null)
                message = ":keep-alive\n\n";
            else
                message = formatMessage(message);

            if(!sendMessage(message))
                return;

            boolean once_again = false;
            synchronized(HugeStreamWithThreads.this) {
                if(this.id != HugeStreamWithThreads.this.id)
                    once_again = true;
            }
            if(once_again)
                pool.submit(this);

        }
        boolean sendMessage(String message) 
        {
            try {
                ServletOutputStream out = ac.getResponse().getOutputStream();
                out.print(message);
                out.flush();
                lastUpdate = System.nanoTime();
                return true;
            }
            catch(IOException e) {
                ac.complete();
                removeContext(this);
                return false;
            }
        }
    };

    private HashSet<RunJob> asyncContexts = new HashSet<RunJob>();

    @Override
    public void init(ServletConfig config) throws ServletException
    {
        super.init(config);
        timer.start();
    }
    @Override
    public void destroy()
    {
        for(;;){
            try {
                timer.interrupt();
                timer.join();
                break;
            }
            catch(InterruptedException e) {
                continue;
            }
        }
        pool.shutdown();
        super.destroy();
    }


    protected synchronized void removeContext(RunJob ac)
    {
        asyncContexts.remove(ac);
    }

    // GET method is used to establish a stream connection
    @Override
    protected synchronized void doGet(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {

        // Content-Type header
        response.setContentType("text/event-stream");
        response.setCharacterEncoding("utf-8");

        // Access-Control-Allow-Origin header
        response.setHeader("Access-Control-Allow-Origin", "*");

        final AsyncContext ac = request.startAsync();

        ac.setTimeout(0);
        RunJob job = new RunJob(ac);
        asyncContexts.add(job);
        if(id!=0) {
            pool.submit(job);
        }
    }

    private synchronized void sendKeepAlive()
    {
        for(RunJob job : asyncContexts) {
            job.keepAlive();
        }
    }

    // POST method is used to communicate with the server
    @Override
    protected synchronized void doPost(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException 
    {
        request.setCharacterEncoding("utf-8");
        id++;
        message = request.getParameter("m");        
        for(RunJob job : asyncContexts) {
            pool.submit(job);
        }
    }


}

上面的示例使用线程来阻止阻塞......但是如果阻塞客户端的数量大于它将阻塞的线程池的大小。

The sample above uses threads to prevent blocking... However if the number of blocking clients is bigger than the size of the thread pool it would block.

怎么可能它是在没有阻塞的情况下实现的吗?

How could it be implemented without blocking?

推荐答案

我找到了 Servlet 3.0 异步 API难以实现正确且有用的文档稀疏。经过大量的试验和错误并尝试了许多不同的方法后,我能够找到一个我非常满意的强大解决方案。当我查看我的代码并将其与您的代码进行比较时,我注意到一个可能对您的特定问题有帮助的主要区别。我使用 ServletResponse 来编写数据而不是 ServletOutputStream

I've found the Servlet 3.0 Asynchronous API tricky to implement correctly and helpful documentation to be sparse. After a lot of trial and error and trying many different approaches, I was able to find a robust solution that I've been very happy with. When I look at my code and compare it to yours, I notice one major difference that may help you with your particular problem. I use a ServletResponse to write the data and not a ServletOutputStream.

这里我的首选异步Servlet类略适用于你的 some_big_data 案例:

Here my go-to Asynchronous Servlet class adapted slightly for your some_big_data case:

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebInitParam;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;

import org.apache.log4j.Logger;

@javax.servlet.annotation.WebServlet(urlPatterns = { "/async" }, asyncSupported = true, initParams = { @WebInitParam(name = "threadpoolsize", value = "100") })
public class AsyncServlet extends HttpServlet {

  private static final Logger logger = Logger.getLogger(AsyncServlet.class);

  public static final int CALLBACK_TIMEOUT = 10000; // ms

  /** executor service */
  private ExecutorService exec;

  @Override
  public void init(ServletConfig config) throws ServletException {

    super.init(config);
    int size = Integer.parseInt(getInitParameter("threadpoolsize"));
    exec = Executors.newFixedThreadPool(size);
  }

  @Override
  public void service(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException {

    final AsyncContext ctx = req.startAsync();
    final HttpSession session = req.getSession();

    // set the timeout
    ctx.setTimeout(CALLBACK_TIMEOUT);

    // attach listener to respond to lifecycle events of this AsyncContext
    ctx.addListener(new AsyncListener() {

      @Override
      public void onComplete(AsyncEvent event) throws IOException {

        logger.info("onComplete called");
      }

      @Override
      public void onTimeout(AsyncEvent event) throws IOException {

        logger.info("onTimeout called");
      }

      @Override
      public void onError(AsyncEvent event) throws IOException {

        logger.info("onError called: " + event.toString());
      }

      @Override
      public void onStartAsync(AsyncEvent event) throws IOException {

        logger.info("onStartAsync called");
      }
    });

    enqueLongRunningTask(ctx, session);
  }

  /**
   * if something goes wrong in the task, it simply causes timeout condition that causes the async context listener to be invoked (after the fact)
   * <p/>
   * if the {@link AsyncContext#getResponse()} is null, that means this context has already timed out (and context listener has been invoked).
   */
  private void enqueLongRunningTask(final AsyncContext ctx, final HttpSession session) {

    exec.execute(new Runnable() {

      @Override
      public void run() {

        String some_big_data = getSomeBigData();

        try {

          ServletResponse response = ctx.getResponse();
          if (response != null) {
            response.getWriter().write(some_big_data);
            ctx.complete();
          } else {
            throw new IllegalStateException(); // this is caught below
          }
        } catch (IllegalStateException ex) {
          logger.error("Request object from context is null! (nothing to worry about.)"); // just means the context was already timeout, timeout listener already called.
        } catch (Exception e) {
          logger.error("ERROR IN AsyncServlet", e);
        }
      }
    });
  }

  /** destroy the executor */
  @Override
  public void destroy() {

    exec.shutdown();
  }
}

这篇关于Servlet-3异步上下文,如何进行异步写入?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆