ReactorNettyWebSocketClient Examples

The new Spring has a WebSocketClient example in the Spring documentation .

WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute("ws://localhost:8080/echo"), session -> {... }).blockMillis(5000);

But it is very short and incomprehensible:

  • How to send a message to the server (subscribe to the channel)?
  • Then process the incoming stream and send Flux messages?
  • Connect to server when connection is interrupted?

Can someone provide a more complex example?

UPD. I tried to do something like:

public Flux<String> getStreaming() {

    WebSocketClient client = new ReactorNettyWebSocketClient();
    EmitterProcessor<String> output = EmitterProcessor.create();
    Flux<String> input = Flux.just("{ event: 'subscribe', channel: 'examplpe' }");

    Mono<Void> sessionMono = client.execute(URI.create("ws://api.example.com/"),
            session -> session
                    .send(input.map(session::textMessage))
                    .thenMany(session.receive().map(WebSocketMessage::getPayloadAsText).subscribeWith(output).then())
                    .then());

    return output.doOnSubscribe(s -> sessionMono.subscribe());
}

But this only returns one message. As if I didn’t receive a subscription.

+4
source share
1 answer

, "". , -, "" .

websocket. , .

ws://echo.websocket.org . /stream, .

@GetMapping(path = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> getStreaming() throws URISyntaxException {

    Flux<String> input = Flux.<String>generate(sink -> sink.next(String.format("{ message: 'got message', date: '%s' }", new Date())))
        .delayElements(Duration.ofSeconds(1));

    WebSocketClient client = new ReactorNettyWebSocketClient();
    EmitterProcessor<String> output = EmitterProcessor.create();

    Mono<Void> sessionMono = client.execute(URI.create("ws://echo.websocket.org"), session -> session.send(input.map(session::textMessage))
        .thenMany(session.receive().map(WebSocketMessage::getPayloadAsText).subscribeWith(output).then()).then());

    return output.doOnSubscribe(s -> sessionMono.subscribe());
}

, ...

+2

Source: https://habr.com/ru/post/1689437/


All Articles