Server dispatched event from Jersey: EventOutput does not close after client crashes

I use jersey to implement the SSE script.

The server supports connections. And periodically send data to clients.

In my scenario, there is a connection restriction, only a certain number of clients can subscribe to the server at a time.

So, when a new client tries to subscribe, I check (EventOutput.isClosed) to find out if any old connections were no longer inactive, so they can free up space for new connections.

But the result of EventOutput.isClosed is always false, unless the client explicitly closes the EventSource. This means that if a client accidentally crashes (power outage or Internet clipping), it still messes up the connection and new clients cannot subscribe.

Is there any work for this?

+1
source share
2 answers

@CuiPengFei,

So, in my travels, trying to find the answer to this, I came across a repository that explains how to handle the elegant cleansing of connections from disconnected clients.

Encapsulate all SSE EventOutput logic in Service / Manager. In this case, they deploy a thread that checks to see if the EventOutput is closed by the client. If so, they officially close the connection (EventOutput # close ()). If not, they try to write to the stream. If he throws an exception, the client disconnects without closing, and he closes it. If the recording is successful, EventOutput returns to the pool because it is still an active connection.

Repo (and actual class) are available here . Ive also included a non-import class below in the case when the repo is ever deleted.

Note that they associate this with Singleton. The store must be globally unique.

public class SseWriteManager { private final ConcurrentHashMap<String, EventOutput> connectionMap = new ConcurrentHashMap<>(); private final ScheduledExecutorService messageExecutorService; private final Logger logger = LoggerFactory.getLogger(SseWriteManager.class); public SseWriteManager() { messageExecutorService = Executors.newScheduledThreadPool(1); messageExecutorService.scheduleWithFixedDelay(new messageProcessor(), 0, 5, TimeUnit.SECONDS); } public void addSseConnection(String id, EventOutput eventOutput) { logger.info("adding connection for id={}.", id); connectionMap.put(id, eventOutput); } private class messageProcessor implements Runnable { @Override public void run() { try { Iterator<Map.Entry<String, EventOutput>> iterator = connectionMap.entrySet().iterator(); while (iterator.hasNext()) { boolean remove = false; Map.Entry<String, EventOutput> entry = iterator.next(); EventOutput eventOutput = entry.getValue(); if (eventOutput != null) { if (eventOutput.isClosed()) { remove = true; } else { try { logger.info("writing to id={}.", entry.getKey()); eventOutput.write(new OutboundEvent.Builder().name("custom-message").data(String.class, "EOM").build()); } catch (Exception ex) { logger.info(String.format("write failed to id=%s.", entry.getKey()), ex); remove = true; } } } if (remove) { // we are removing the eventOutput. close it is if it not already closed. if (!eventOutput.isClosed()) { try { eventOutput.close(); } catch (Exception ex) { // do nothing. } } iterator.remove(); } } } catch (Exception ex) { logger.error("messageProcessor.run threw exception.", ex); } } } public void shutdown() { if (messageExecutorService != null && !messageExecutorService.isShutdown()) { logger.info("SseWriteManager.shutdown: calling messageExecutorService.shutdown."); messageExecutorService.shutdown(); } else { logger.info("SseWriteManager.shutdown: messageExecutorService == null || messageExecutorService.isShutdown()."); } }} 
+6
source

An update is required on this subject:

What happens is that the client-side eventSource (js) never got into readyState '1' unless we made the broadcast as soon as a new subscription was added. Even in this state, the client can receive data transmitted from the server. Adding a call to broadcast a simple β€œOK” message helped bring eventSource to readyState 1.

When closing the connection from the client side; to be active in clearing resources, simply closing the eventSource on the client side does not help. We need to make another ajax call to the server to force the server to do the translation. When the translation is forced, the jersey will clear connections that are no longer alive and will be in the release resource queue (Connections in CLOSE_WAIT). If the connection is not delayed in CLOSE_WAIT, until the next broadcast occurs.

+2
source

Source: https://habr.com/ru/post/1232215/


All Articles