Description of the problem
The Servlet-3.0 API allows you to separate the request / response context and respond to it later.
However, if I try to write a large amount of data, something like:
AsyncContext ac = getWaitingContext() ; ServletOutputStream out = ac.getResponse().getOutputStream(); out.print(some_big_data); out.flush()
It can block - and it blocks trivial test cases - for both Tomcat 7 and Jetty 8. Tutorials recommend creating a thread pool that handles such a setting - a witch is usually counter-positive for traditional 10K architecture.
However, if I have 10,000 open connections and a thread pool, letโs say 10 threads, even 1% of clients with low-speed connections or just blocked connections can block the thread pool and completely block the cometโs reaction or slow it down significantly.
The expected practice is to receive a notification of readiness for recording or a notification about the completion of I / O and how to continue to click data.
How can this be done using the Servlet-3.0 API, that is, how can I get:
- Notification of asynchronous completion of an I / O operation.
- Receive non-blocking I / O with a ready alert.
If this is not supported by the Servlet-3.0 API, are there any APIs for the web server (e.g. Jetty Continuation or Tomcat CometEvent) that allow these events to be processed truly asynchronously without generating asynchronous I / O with using thread pool.
Somebody knows?
And if this is not possible, can you confirm it with a link to the documentation?
Demonstration of the problem in the sample code
I have added the code below that emulates an event stream.
Notes:
- it uses a
ServletOutputStream that throws an IOException to detect disconnected clients - sends
keep-alive messages to make sure clients are still there - I created a thread pool to "emulate" asynchronous operations.
In this example, I explicitly defined a thread pool of size 1 to show the problem:
- Launch the application
- Running from two terminals
curl http://localhost:8080/path/to/app (twice) - Now send the data using
curd -dm=message http://localhost:8080/path/to/app - Both clients received data
- Now suspend one of the clients (Ctrl + Z) and send the message again
curd -dm=message http://localhost:8080/path/to/app - Please note that another non-paused client either did not receive anything, or after the message was sent, they stopped receiving keep-alive requests because the other thread was blocked.
I want to solve this problem without using a thread pool, because with 1000-5000 open connections. I can very quickly exhaust the flow.
Sample code below.
import java.io.IOException; import java.util.HashSet; import java.util.Iterator; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.LinkedBlockingQueue; import javax.servlet.AsyncContext; import javax.servlet.ServletConfig; import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.servlet.ServletOutputStream; @WebServlet(urlPatterns = "", asyncSupported = true) public class HugeStreamWithThreads extends HttpServlet { private long id = 0; private String message = ""; private final ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 50000L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); // it is explicitly small for demonstration purpose private final Thread timer = new Thread(new Runnable() { public void run() { try { while(true) { Thread.sleep(1000); sendKeepAlive(); } } catch(InterruptedException e) { // exit } } }); class RunJob implements Runnable { volatile long lastUpdate = System.nanoTime(); long id = 0; AsyncContext ac; RunJob(AsyncContext ac) { this.ac = ac; } public void keepAlive() { if(System.nanoTime() - lastUpdate > 1000000000L) pool.submit(this); } String formatMessage(String msg) { StringBuilder sb = new StringBuilder(); sb.append("id"); sb.append(id); for(int i=0;i<100000;i++) { sb.append("data:"); sb.append(msg); sb.append("\n"); } sb.append("\n"); return sb.toString(); } public void run() { String message = null; synchronized(HugeStreamWithThreads.this) { if(this.id != HugeStreamWithThreads.this.id) { this.id = HugeStreamWithThreads.this.id; message = HugeStreamWithThreads.this.message; } } if(message == null) message = ":keep-alive\n\n"; else message = formatMessage(message); if(!sendMessage(message)) return; boolean once_again = false; synchronized(HugeStreamWithThreads.this) { if(this.id != HugeStreamWithThreads.this.id) once_again = true; } if(once_again) pool.submit(this); } boolean sendMessage(String message) { try { ServletOutputStream out = ac.getResponse().getOutputStream(); out.print(message); out.flush(); lastUpdate = System.nanoTime(); return true; } catch(IOException e) { ac.complete(); removeContext(this); return false; } } }; private HashSet<RunJob> asyncContexts = new HashSet<RunJob>(); @Override public void init(ServletConfig config) throws ServletException { super.init(config); timer.start(); } @Override public void destroy() { for(;;){ try { timer.interrupt(); timer.join(); break; } catch(InterruptedException e) { continue; } } pool.shutdown(); super.destroy(); } protected synchronized void removeContext(RunJob ac) { asyncContexts.remove(ac); } // GET method is used to establish a stream connection @Override protected synchronized void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { // Content-Type header response.setContentType("text/event-stream"); response.setCharacterEncoding("utf-8"); // Access-Control-Allow-Origin header response.setHeader("Access-Control-Allow-Origin", "*"); final AsyncContext ac = request.startAsync(); ac.setTimeout(0); RunJob job = new RunJob(ac); asyncContexts.add(job); if(id!=0) { pool.submit(job); } } private synchronized void sendKeepAlive() { for(RunJob job : asyncContexts) { job.keepAlive(); } } // POST method is used to communicate with the server @Override protected synchronized void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { request.setCharacterEncoding("utf-8"); id++; message = request.getParameter("m"); for(RunJob job : asyncContexts) { pool.submit(job); } } }
The above example uses threads to prevent blocking ... However, if the number of blocking clients is greater than the size of the thread pool, it blocks.
How can this be implemented without blocking?