Http Websocket as Akka Stream Source

I would like to listen to a website using akka streams. That is, I would like to consider this as nothing but Source.

However, all official examples relate to connecting web sockets as Flow.

My current approach is used websocketClientFlowin conjunction with Source.maybe. This ultimately leads to an upstream failure due to TcpIdleTimeoutExceptionwhen a new stream is Messagenot sent downstream.

Therefore, my question is twofold:

  • Is there a way that I obviously missed out on - considering websocket as simple Source?
  • If used Flow, the only option is how to handle it correctly TcpIdleTimeoutException? Exception cannot be achieved by providing a flow control strategy. Restarting the source using RestartSourcedoes not help either, because the source is not a problem.

Update

So, I tried two different approaches, setting the wait timeout to 1 second for convenience

application.conf

akka.http.client.idle-timeout = 1s

Using keepAlive (as suggested by Stefano)

Source.<Message>maybe()
    .keepAlive(Duration.apply(1, "second"), () -> (Message) TextMessage.create("keepalive"))
    .viaMat(Http.get(system).webSocketClientFlow(WebSocketRequest.create(websocketUri)), Keep.right())
    { ... }

However, Upstream still does not work with TcpIdleTimeoutException.

Using RestartFlow

However, I found out about this approach using RestartFlow:

final Flow<Message, Message, NotUsed> restartWebsocketFlow = RestartFlow.withBackoff(
    Duration.apply(3, TimeUnit.SECONDS),
    Duration.apply(30, TimeUnit.SECONDS),
    0.2,
    () -> createWebsocketFlow(system, websocketUri)
);

Source.<Message>maybe()
    .viaMat(restartWebsocketFlow, Keep.right()) // One can treat this part of the resulting graph as a `Source<Message, NotUsed>`
    { ... }

(...)

private Flow<Message, Message, CompletionStage<WebSocketUpgradeResponse>> createWebsocketFlow(final ActorSystem system, final String websocketUri) {
    return Http.get(system).webSocketClientFlow(WebSocketRequest.create(websocketUri));
}

This works in that I can consider websocket as a source (albeit artificially, as Stefano explained) and keep the tcp connection alive by restarting websocketClientFlowwhen it does Exception.

.

+4
2
  • . WebSocket , Akka-HTTP Flow. , Flow "" , Flow.fromSinkAndSource(Sink.ignore, mySource), Flow.fromSinkAndSource(mySink, Source.maybe), .

  • :

    WebSocket idle-timeout. , --, "keep-alive" .

    keep-alive, . Akka. NB: .

    src.keepAlive(1.second, () => TextMessage.Strict("ping"))

+3

, . asSourceOf?

path("measurements") {
  entity(asSourceOf[Measurement]) { measurements =>
    // measurement has type Source[Measurement, NotUsed]
    ...
  }
}
0

Source: https://habr.com/ru/post/1691880/


All Articles