@CuiPengFei,
So, in my travels, trying to find the answer to this, I came across a repository that explains how to handle the elegant cleansing of connections from disconnected clients.
Encapsulate all SSE EventOutput logic in Service / Manager. In this case, they deploy a thread that checks to see if the EventOutput is closed by the client. If so, they officially close the connection (EventOutput # close ()). If not, they try to write to the stream. If he throws an exception, the client disconnects without closing, and he closes it. If the recording is successful, EventOutput returns to the pool because it is still an active connection.
Repo (and actual class) are available here . Ive also included a non-import class below in the case when the repo is ever deleted.
Note that they associate this with Singleton. The store must be globally unique.
public class SseWriteManager { private final ConcurrentHashMap<String, EventOutput> connectionMap = new ConcurrentHashMap<>(); private final ScheduledExecutorService messageExecutorService; private final Logger logger = LoggerFactory.getLogger(SseWriteManager.class); public SseWriteManager() { messageExecutorService = Executors.newScheduledThreadPool(1); messageExecutorService.scheduleWithFixedDelay(new messageProcessor(), 0, 5, TimeUnit.SECONDS); } public void addSseConnection(String id, EventOutput eventOutput) { logger.info("adding connection for id={}.", id); connectionMap.put(id, eventOutput); } private class messageProcessor implements Runnable { @Override public void run() { try { Iterator<Map.Entry<String, EventOutput>> iterator = connectionMap.entrySet().iterator(); while (iterator.hasNext()) { boolean remove = false; Map.Entry<String, EventOutput> entry = iterator.next(); EventOutput eventOutput = entry.getValue(); if (eventOutput != null) { if (eventOutput.isClosed()) { remove = true; } else { try { logger.info("writing to id={}.", entry.getKey()); eventOutput.write(new OutboundEvent.Builder().name("custom-message").data(String.class, "EOM").build()); } catch (Exception ex) { logger.info(String.format("write failed to id=%s.", entry.getKey()), ex); remove = true; } } } if (remove) {
source share