Servlet-3 Async Context, how to make asynchronous writes?

Description of the problem

The Servlet-3.0 API allows you to separate the request / response context and respond to it later.

However, if I try to write a large amount of data, something like:

AsyncContext ac = getWaitingContext() ; ServletOutputStream out = ac.getResponse().getOutputStream(); out.print(some_big_data); out.flush() 

It can block - and it blocks trivial test cases - for both Tomcat 7 and Jetty 8. Tutorials recommend creating a thread pool that handles such a setting - a witch is usually counter-positive for traditional 10K architecture.

However, if I have 10,000 open connections and a thread pool, letโ€™s say 10 threads, even 1% of clients with low-speed connections or just blocked connections can block the thread pool and completely block the cometโ€™s reaction or slow it down significantly.

The expected practice is to receive a notification of readiness for recording or a notification about the completion of I / O and how to continue to click data.

How can this be done using the Servlet-3.0 API, that is, how can I get:

  • Notification of asynchronous completion of an I / O operation.
  • Receive non-blocking I / O with a ready alert.

If this is not supported by the Servlet-3.0 API, are there any APIs for the web server (e.g. Jetty Continuation or Tomcat CometEvent) that allow these events to be processed truly asynchronously without generating asynchronous I / O with using thread pool.

Somebody knows?

And if this is not possible, can you confirm it with a link to the documentation?

Demonstration of the problem in the sample code

I have added the code below that emulates an event stream.

Notes:

  • it uses a ServletOutputStream that throws an IOException to detect disconnected clients
  • sends keep-alive messages to make sure clients are still there
  • I created a thread pool to "emulate" asynchronous operations.

In this example, I explicitly defined a thread pool of size 1 to show the problem:

  • Launch the application
  • Running from two terminals curl http://localhost:8080/path/to/app (twice)
  • Now send the data using curd -dm=message http://localhost:8080/path/to/app
  • Both clients received data
  • Now suspend one of the clients (Ctrl + Z) and send the message again curd -dm=message http://localhost:8080/path/to/app
  • Please note that another non-paused client either did not receive anything, or after the message was sent, they stopped receiving keep-alive requests because the other thread was blocked.

I want to solve this problem without using a thread pool, because with 1000-5000 open connections. I can very quickly exhaust the flow.

Sample code below.




 import java.io.IOException; import java.util.HashSet; import java.util.Iterator; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.LinkedBlockingQueue; import javax.servlet.AsyncContext; import javax.servlet.ServletConfig; import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.servlet.ServletOutputStream; @WebServlet(urlPatterns = "", asyncSupported = true) public class HugeStreamWithThreads extends HttpServlet { private long id = 0; private String message = ""; private final ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 50000L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); // it is explicitly small for demonstration purpose private final Thread timer = new Thread(new Runnable() { public void run() { try { while(true) { Thread.sleep(1000); sendKeepAlive(); } } catch(InterruptedException e) { // exit } } }); class RunJob implements Runnable { volatile long lastUpdate = System.nanoTime(); long id = 0; AsyncContext ac; RunJob(AsyncContext ac) { this.ac = ac; } public void keepAlive() { if(System.nanoTime() - lastUpdate > 1000000000L) pool.submit(this); } String formatMessage(String msg) { StringBuilder sb = new StringBuilder(); sb.append("id"); sb.append(id); for(int i=0;i<100000;i++) { sb.append("data:"); sb.append(msg); sb.append("\n"); } sb.append("\n"); return sb.toString(); } public void run() { String message = null; synchronized(HugeStreamWithThreads.this) { if(this.id != HugeStreamWithThreads.this.id) { this.id = HugeStreamWithThreads.this.id; message = HugeStreamWithThreads.this.message; } } if(message == null) message = ":keep-alive\n\n"; else message = formatMessage(message); if(!sendMessage(message)) return; boolean once_again = false; synchronized(HugeStreamWithThreads.this) { if(this.id != HugeStreamWithThreads.this.id) once_again = true; } if(once_again) pool.submit(this); } boolean sendMessage(String message) { try { ServletOutputStream out = ac.getResponse().getOutputStream(); out.print(message); out.flush(); lastUpdate = System.nanoTime(); return true; } catch(IOException e) { ac.complete(); removeContext(this); return false; } } }; private HashSet<RunJob> asyncContexts = new HashSet<RunJob>(); @Override public void init(ServletConfig config) throws ServletException { super.init(config); timer.start(); } @Override public void destroy() { for(;;){ try { timer.interrupt(); timer.join(); break; } catch(InterruptedException e) { continue; } } pool.shutdown(); super.destroy(); } protected synchronized void removeContext(RunJob ac) { asyncContexts.remove(ac); } // GET method is used to establish a stream connection @Override protected synchronized void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { // Content-Type header response.setContentType("text/event-stream"); response.setCharacterEncoding("utf-8"); // Access-Control-Allow-Origin header response.setHeader("Access-Control-Allow-Origin", "*"); final AsyncContext ac = request.startAsync(); ac.setTimeout(0); RunJob job = new RunJob(ac); asyncContexts.add(job); if(id!=0) { pool.submit(job); } } private synchronized void sendKeepAlive() { for(RunJob job : asyncContexts) { job.keepAlive(); } } // POST method is used to communicate with the server @Override protected synchronized void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { request.setCharacterEncoding("utf-8"); id++; message = request.getParameter("m"); for(RunJob job : asyncContexts) { pool.submit(job); } } } 

The above example uses threads to prevent blocking ... However, if the number of blocking clients is greater than the size of the thread pool, it blocks.

How can this be implemented without blocking?

+46
java comet
Aug 23 '12 at 5:27
source share
6 answers

I found the Servlet 3.0 Asynchronous API complex so that the correct and useful documentation was sparse. After a lot of trial and error and attempts of many different approaches, I was able to find a reliable solution, which I was very pleased with. When I look at my code and compare it with yours, I notice one significant difference that can help you solve your specific problem. I use ServletResponse to write data, not ServletOutputStream .

Here, my transition to the asynchronous servlet class is slightly adapted for your case some_big_data :

 import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.servlet.AsyncContext; import javax.servlet.AsyncEvent; import javax.servlet.AsyncListener; import javax.servlet.ServletConfig; import javax.servlet.ServletException; import javax.servlet.ServletResponse; import javax.servlet.annotation.WebInitParam; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpSession; import org.apache.log4j.Logger; @javax.servlet.annotation.WebServlet(urlPatterns = { "/async" }, asyncSupported = true, initParams = { @WebInitParam(name = "threadpoolsize", value = "100") }) public class AsyncServlet extends HttpServlet { private static final Logger logger = Logger.getLogger(AsyncServlet.class); public static final int CALLBACK_TIMEOUT = 10000; // ms /** executor service */ private ExecutorService exec; @Override public void init(ServletConfig config) throws ServletException { super.init(config); int size = Integer.parseInt(getInitParameter("threadpoolsize")); exec = Executors.newFixedThreadPool(size); } @Override public void service(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException { final AsyncContext ctx = req.startAsync(); final HttpSession session = req.getSession(); // set the timeout ctx.setTimeout(CALLBACK_TIMEOUT); // attach listener to respond to lifecycle events of this AsyncContext ctx.addListener(new AsyncListener() { @Override public void onComplete(AsyncEvent event) throws IOException { logger.info("onComplete called"); } @Override public void onTimeout(AsyncEvent event) throws IOException { logger.info("onTimeout called"); } @Override public void onError(AsyncEvent event) throws IOException { logger.info("onError called: " + event.toString()); } @Override public void onStartAsync(AsyncEvent event) throws IOException { logger.info("onStartAsync called"); } }); enqueLongRunningTask(ctx, session); } /** * if something goes wrong in the task, it simply causes timeout condition that causes the async context listener to be invoked (after the fact) * <p/> * if the {@link AsyncContext#getResponse()} is null, that means this context has already timed out (and context listener has been invoked). */ private void enqueLongRunningTask(final AsyncContext ctx, final HttpSession session) { exec.execute(new Runnable() { @Override public void run() { String some_big_data = getSomeBigData(); try { ServletResponse response = ctx.getResponse(); if (response != null) { response.getWriter().write(some_big_data); ctx.complete(); } else { throw new IllegalStateException(); // this is caught below } } catch (IllegalStateException ex) { logger.error("Request object from context is null! (nothing to worry about.)"); // just means the context was already timeout, timeout listener already called. } catch (Exception e) { logger.error("ERROR IN AsyncServlet", e); } } }); } /** destroy the executor */ @Override public void destroy() { exec.shutdown(); } } 
+28
Aug 28 '12 at 11:17
source share

During my research on this topic, this thread continued to appear, so I realized that here:

Servlet 3.1 introduced asynchronous operations in ServletInputStream and ServletOutputStream . See ServletOutputStream.setWriteListener .

An example can be found at http://docs.oracle.com/javaee/7/tutorial/servlets013.htm

+8
Sep 07 '13 at 11:13
source share
+3
Sep 25 '12 at 23:18
source share

We cannot cause records to be asynchronous. We really have to live with the restriction that when we write something to the client, we expect that we can do it quickly and can consider it as a mistake if we do not. That is, if our goal is to transfer data to the client as soon as possible and use the blocking / non-blocking status of the channel as a way to control the flow, we are out of luck. But if we send data at a low speed that the client can handle, we can at least quickly disconnect clients that do not read fast enough.

For example, in your application, we send keepalives at a slow speed (every few seconds) and expect that clients will be able to keep up with all the events they send. We spend data on the client, and if he cannot keep up, we can disconnect him quickly and cleanly. This is a bit more limited than true asynchronous I / O, but it should satisfy your needs (and by the way mine).

The trick is that all output methods that simply throw IOExceptions actually do something more: in the implementation, all calls to things that can be interrupted () ed will be wrapped with something like this (taken from Jetty 9 ):

 catch (InterruptedException x) throw (IOException)new InterruptedIOException().initCause(x); 

(I also note that this does not happen in Jetty 8, where an InterruptedException is logged and the lock cycle is immediately repeated. Presumably, you are doing this to make sure your servlet container is behaving well to use this trick.)

That is, when a slow client causes the write stream to lock, we simply force the record to be thrown as an IOEx exception by calling interrupt () on the stream. Think about it: non-blocking code will consume a unit of time in one of our processing threads to execute anyway, so using blocking records that have just been interrupted (after one millisecond) is basically identical. We still just chew a small amount of time per stream, only slightly less efficiently.

I changed my code so that the main timer thread runs a task to tie the time in each record before we start recording, and the task will be canceled if the recording ends quickly, which it should.

One final note: in a well-implemented servlet container, as a result of which I / O reset should be safe. It would be nice if we could catch an InterruptedIOException and try to write later. Perhaps we would like to provide slow clients with a subset of events if they cannot keep up with the full flow. As far as I can tell, Jetty is not entirely safe. If you select a record, the internal state of the HttpResponse object may not be consistent enough so that you can reenter the record later. I expect it is impractical to try to push the servlet container this way unless the specific documents that I skipped suggesting this guarantee are indicated. I think the idea is that the connection is meant to be disconnected if an IOException occurs.

Here, the code with a modified version of RunJob :: run () uses a cumbersome simple illustration (in fact, we would like to use the main timer thread here, rather than expanding one record per record, which is stupid).

 public void run() { String message = null; synchronized(HugeStreamWithThreads.this) { if(this.id != HugeStreamWithThreads.this.id) { this.id = HugeStreamWithThreads.this.id; message = HugeStreamWithThreads.this.message; } } if(message == null) message = ":keep-alive\n\n"; else message = formatMessage(message); final Thread curr = Thread.currentThread(); Thread canceller = new Thread(new Runnable() { public void run() { try { Thread.sleep(2000); curr.interrupt(); } catch(InterruptedException e) { // exit } } }); canceller.start(); try { if(!sendMessage(message)) return; } finally { canceller.interrupt(); while (true) { try { canceller.join(); break; } catch (InterruptedException e) { } } } boolean once_again = false; synchronized(HugeStreamWithThreads.this) { if(this.id != HugeStreamWithThreads.this.id) once_again = true; } if(once_again) pool.submit(this); } 
+3
Apr 30
source share

Is there a spring option for you? Spring-MVC 3.2 has a class called DeferredResult that will gracefully handle your "100,000 open connections / 10 server pool threads" script.

Example: http://blog.springsource.org/2012/05/06/spring-mvc-3-2-preview-introducing-servlet-3-async-support/

+2
Aug 30 '12 at 16:32
source share

I quickly looked through your listing, so I might have missed some points. The advantage of thread pooling is the sharing of thread resources between multiple tasks. Perhaps you can solve your problem by splitting keepAlive responses into different http connections, instead of grouping them all at the same time.

-one
Feb 28 '13 at 10:23
source share



All Articles