Jersey Broadcasting SSE: Closed Joint Detection

I think this question is not a duplicate. The server sent an event from Jersey: EventOutput does not close after the client crashes , but is probably connected to Events with the Jersey server / server - writing to an idle connection does not raise an exception .

In chapter 15.4.2 of the Jersey SseBroadcaster documentation:

However, SseBroadcaster internally identifies and processes client disconnects as well. When the client closes the connection, the broadcaster detects this and removes the outdated connection from the internal collection of registered events, and also frees up all server resources associated with the outdated connection.

I can’t confirm this. In the following test SseBroadcaster , I see that the subclass method of SseBroadcaster onClose() never called: not when closing EventInput , and not when sending another message.

 public class NotificationsResourceTest extends JerseyTest { final static Logger log = LoggerFactory.getLogger(NotificationsResourceTest.class); final static CountingSseBroadcaster broadcaster = new CountingSseBroadcaster(); public static class CountingSseBroadcaster extends SseBroadcaster { final AtomicInteger connectionCounter = new AtomicInteger(0); public EventOutput createAndAttachEventOutput() { EventOutput output = new EventOutput(); if (add(output)) { int cons = connectionCounter.incrementAndGet(); log.debug("Active connection count: "+ cons); } return output; } @Override public void onClose(final ChunkedOutput<OutboundEvent> output) { int cons = connectionCounter.decrementAndGet(); log.debug("A connection has been closed. Active connection count: "+ cons); } @Override public void onException(final ChunkedOutput<OutboundEvent> chunkedOutput, final Exception exception) { log.trace("An exception has been detected", exception); } public int getConnectionCount() { return connectionCounter.get(); } } @Path("notifications") public static class NotificationsResource { @GET @Produces(SseFeature.SERVER_SENT_EVENTS) public EventOutput subscribe() { log.debug("New stream subscription"); EventOutput eventOutput = broadcaster.createAndAttachEventOutput(); return eventOutput; } } @Override protected Application configure() { ResourceConfig config = new ResourceConfig(NotificationsResource.class); config.register(SseFeature.class); return config; } @Test public void test() throws Exception { // check that there are no connections assertEquals(0, broadcaster.getConnectionCount()); // connect subscriber log.info("Connecting subscriber"); EventInput eventInput = target("notifications").request().get(EventInput.class); assertFalse(eventInput.isClosed()); // now there are connections assertEquals(1, broadcaster.getConnectionCount()); // push data log.info("Broadcasting data"); String payload = UUID.randomUUID().toString(); OutboundEvent chunk = new OutboundEvent.Builder() .mediaType(MediaType.TEXT_PLAIN_TYPE) .name("message") .data(payload) .build(); broadcaster.broadcast(chunk); // read data log.info("Reading data"); InboundEvent inboundEvent = eventInput.read(); assertNotNull(inboundEvent); assertEquals(payload, inboundEvent.readData()); // close subscription log.info("Closing subscription"); eventInput.close(); assertTrue(eventInput.isClosed()); // at this point, the subscriber has disconnected itself, // but jersey doesnt realise that assertEquals(1, broadcaster.getConnectionCount()); // wait, give TCP a chance to close the connection log.debug("Sleeping for some time"); Thread.sleep(10000); // push data again, this should really flush out the not-connected client log.info("Broadcasting data again"); broadcaster.broadcast(chunk); Thread.sleep(100); // there is no subscriber anymore assertEquals(0, broadcaster.getConnectionCount()); // FAILS! } } 

Maybe JerseyTest not a good way to test this. In a less ... clinical setting that uses the JavaScript EventSource , I see onClose() , which is called, but only after the message is broadcast over a previously closed connection.

What am I doing wrong?

Why does SseBroadcaster not detect a client closing a connection?

Subsequent

I found JERSEY-2833 , which was rejected using Works, as was designed:

According to the Jersey documentation in the SSE chapter ( https://jersey.java.net/documentation/latest/sse.html ) in 15.4.1, he mentioned that Jersey does not explicitly close the connection, it is the responsibility of the resource or client method.

What does it mean? Should a resource force a timeout and kill all active and closed connections?

+5
source share
3 answers

I believe that it is better to set a timeout on your resource and kill only this connection, for example:

 @Path("notifications") public static class NotificationsResource { @GET @Produces(SseFeature.SERVER_SENT_EVENTS) public EventOutput subscribe() { log.debug("New stream subscription"); EventOutput eventOutput = broadcaster.createAndAttachEventOutput(); new Timer().schedule( new TimerTask() { @Override public void run() { eventOutput.close() } }, 10000); // 10 second timeout return eventOutput; } } 
0
source

I am wondering if subclasses you might have changed behavior.

  @Override public void onClose(final ChunkedOutput<OutboundEvent> output) { int cons = connectionCounter.decrementAndGet(); log.debug("A connection has been closed. Active connection count: "+ cons); } 

In this case, you do not close ChunkedOutput so that it does not release the connection. Could this be a problem?

0
source

The documentation for the constructor org.glassfish.jersey.media.sse.SseBroadcaster.SseBroadcaster() says:

Creates a new instance. If this constructor is called by a subclass, it assumes that the reason for the existence of the subclass is the implementation of the onClose(org.glassfish.jersey.server.ChunkedOutput) and onException(org.glassfish.jersey.server.ChunkedOutput, Exception) methods, so it adds a newly created instance as a listener. To avoid this, subclasses can call SseBroadcaster(Class) , passing their class as an argument.

Therefore, you should not leave the default constructor and try to implement your constructor that calls super with your class:

 public CountingSseBroadcaster(){ super(CountingSseBroadcaster.class); } 
0
source

Source: https://habr.com/ru/post/1232214/


All Articles