I wrote a Spring RestController that returns an SseEmitter (for the server-dispatched event) and adds HATEOAS references to each event. Here is a simplified but working example of this controller:
package hello; import static org.springframework.hateoas.mvc.ControllerLinkBuilder.linkTo; import static org.springframework.hateoas.mvc.ControllerLinkBuilder.methodOn; import hello.Greeting.Status; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; @RestController public class GreetingController { private static final Logger log = LoggerFactory.getLogger(GreetingController.class); private static final String template = "Hello, %s!"; class GreetingRequestHandler implements Runnable { private ResponseBodyEmitter emitter; private Greeting greeting; public GreetingRequestHandler(final ResponseBodyEmitter emitter, final Greeting greeting) { this.emitter = emitter; this.greeting = greeting; } @Override public void run() { try { log.info(this.greeting.toString()); this.emitter.send(this.greeting); Thread.sleep(5000); if (Status.COMPLETE.equals(this.greeting.getStatus())) { this.emitter.complete(); } else { this.greeting.incrementStatus(); new Thread(new GreetingRequestHandler(this.emitter, this.greeting)).start(); } } catch (IOException | InterruptedException e) { e.printStackTrace(); } } } @RequestMapping(path = "/greeting") public SseEmitter greeting(@RequestParam(value = "name", defaultValue = "World") final String name) { SseEmitter emitter = new SseEmitter(); Greeting greeting = new Greeting(String.format(template, name)); greeting.add(linkTo(methodOn(GreetingController.class).greeting(name)).withSelfRel()); new Thread(new GreetingRequestHandler(emitter, greeting)).start(); log.info("returning emitter"); return emitter; } }
The Greeting class is as follows:
package hello; import java.util.concurrent.atomic.AtomicInteger; import org.springframework.hateoas.ResourceSupport; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; @JsonIgnoreProperties(ignoreUnknown = true) public class Greeting extends ResourceSupport { private final String content; private final static AtomicInteger idProvider = new AtomicInteger(); private int greetingId; private Status status; enum Status { ENQUEUED, PROCESSING, COMPLETE; } @JsonCreator public Greeting(@JsonProperty("content") final String content) { this.greetingId = idProvider.addAndGet(1); this.status = Status.ENQUEUED; this.content = content; } public Status getStatus() { return this.status; } protected void setStatus(final Status status) { this.status = status; } public int getGreetingId() { return this.greetingId; } public String getContent() { return this.content; } @Override public String toString() { return "Greeting{id='" + this.greetingId + "', status='" + this.status + "' content='" + this.content + "', " + super.toString() + "}"; } public void incrementStatus() { switch (this.status) { case ENQUEUED: this.status = Status.PROCESSING; break; case PROCESSING: this.status = Status.COMPLETE; break; default: break; } } }
This code works just fine. If I try to access the REST service using a web browser, I see that the events are displayed with the correct content and link.
The result is as follows (each event appears 5 seconds after the previous one):
data:{"content":"Hello, Kraal!","greetingId":8,"status":"ENQUEUED","_links":{"self":{"href":"http://localhost:8080/greeting?name=Kraal"}}} data:{"content":"Hello, Kraal!","greetingId":8,"status":"PROCESSING","_links":{"self":{"href":"http://localhost:8080/greeting?name=Kraal"}}} data:{"content":"Hello, Kraal!","greetingId":8,"status":"COMPLETE","_links":{"self":{"href":"http://localhost:8080/greeting?name=Kraal"}}}
Now I need to call this REST service and read these events from another Spring application ... But I don't know how to write client code using Spring. This does not work, since RestTemplate designed for synchronous HTTP access on the client side ...
ObjectMapper mapper = new ObjectMapper(); mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); mapper.registerModule(new Jackson2HalModule()); // required for HATEOAS MappingJackson2HttpMessageConverter converter = new MappingJackson2HttpMessageConverter(); converter.setSupportedMediaTypes(MediaType.parseMediaTypes("application/hal+json")); converter.setObjectMapper(mapper); // required in order to be able to read serialized objects MappingJackson2HttpMessageConverter converter2 = new MappingJackson2HttpMessageConverter(); converter2.setSupportedMediaTypes(MediaType.parseMediaTypes("application/octet-stream")); converter2.setObjectMapper(mapper); // required to understand SSE events MappingJackson2HttpMessageConverter converter3 = new MappingJackson2HttpMessageConverter(); converter3.setSupportedMediaTypes(MediaType.parseMediaTypes("text/event-stream")); List<HttpMessageConverter<?>> converters = new ArrayList<HttpMessageConverter<?>>(); converters.add(converter); converters.add(converter2); converters.add(converter3); // probably wrong template RestTemplate restTemplate = new RestTemplate(); restTemplate = new RestTemplate(converters); // this does not work as I receive events and no a single object Greeting greeting = restTemplate.getForObject("http://localhost:8080/greeting/?name=Kraal", Greeting.class); log.info(greeting.toString());
The error message I get is:
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'data': was expecting ('true', 'false' or 'null')
Indeed, each event is an SSE event and begins with "data:" ...
So the questions are :
- which module of the ObjectMapper module should I register in order to be able to map SSE to Jackson?
- How can I subscribe to incoming SSE events (observer pattern) using Spring?
Thanks in advance.
Side note . Since I'm struggling to do this using Spring, I tried to do this using SSE support for Jersey as follows. Using "Jersey", I get events as expected, but then I can not assign them to the Greeting class (for the same reason as above, I think I do not have the correct converter module.):
Client client = ClientBuilder.newBuilder().register(converter).register(SseFeature.class).build(); WebTarget target = client.target("http://localhost:8080/greeting/?name=Kraal"); EventInput eventInput = target.request().get(EventInput.class); while (!eventInput.isClosed()) { final InboundEvent inboundEvent = eventInput.read(); if (inboundEvent == null) {