I would like to listen to a website using akka streams. That is, I would like to consider this as nothing but Source
.
However, all official examples relate to connecting web sockets as Flow
.
My current approach is used websocketClientFlow
in conjunction with Source.maybe
. This ultimately leads to an upstream failure due to TcpIdleTimeoutException
when a new stream is Message
not sent downstream.
Therefore, my question is twofold:
- Is there a way that I obviously missed out on - considering websocket as simple
Source
? - If used
Flow
, the only option is how to handle it correctly TcpIdleTimeoutException
? Exception cannot be achieved by providing a flow control strategy. Restarting the source using RestartSource
does not help either, because the source is not a problem.
Update
So, I tried two different approaches, setting the wait timeout to 1 second for convenience
application.conf
akka.http.client.idle-timeout = 1s
Using keepAlive (as suggested by Stefano)
Source.<Message>maybe()
.keepAlive(Duration.apply(1, "second"), () -> (Message) TextMessage.create("keepalive"))
.viaMat(Http.get(system).webSocketClientFlow(WebSocketRequest.create(websocketUri)), Keep.right())
{ ... }
However, Upstream still does not work with TcpIdleTimeoutException
.
Using RestartFlow
However, I found out about this approach using RestartFlow
:
final Flow<Message, Message, NotUsed> restartWebsocketFlow = RestartFlow.withBackoff(
Duration.apply(3, TimeUnit.SECONDS),
Duration.apply(30, TimeUnit.SECONDS),
0.2,
() -> createWebsocketFlow(system, websocketUri)
);
Source.<Message>maybe()
.viaMat(restartWebsocketFlow, Keep.right())
{ ... }
(...)
private Flow<Message, Message, CompletionStage<WebSocketUpgradeResponse>> createWebsocketFlow(final ActorSystem system, final String websocketUri) {
return Http.get(system).webSocketClientFlow(WebSocketRequest.create(websocketUri));
}
This works in that I can consider websocket as a source (albeit artificially, as Stefano explained) and keep the tcp connection alive by restarting websocketClientFlow
when it does Exception
.
.