Getting started with akka-streams I want to create a simple example. In chrome, using a web socket plugin, I can just connect to a stream like this https://blockchain.info/api/api_websocket via wss://ws.blockchain.info/invand send 2 commands
{"op":"ping"}{"op":"unconfirmed_sub"}
will broadcast the results in the chromes web role plugin window.
I tried to implement the same functionality in akka threads, but ran into some problems:
- 2 commands are executed, but in fact I do not get streaming output
- the same command is executed twice (ping command)
Following the manual http://doc.akka.io/docs/akka/2.4.7/scala/http/client-side/websocket-support.html or http://doc.akka.io/docs/akka-http/ 10.0.0 / scala / http / client-side / websocket-support.html # half-closed-client-websockets
Here is my adaptation below:
object SingleWebSocketRequest extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher
val printSink: Sink[Message, Future[Done]] =
Sink.foreach {
case message: TextMessage.Strict =>
println(message.text)
}
val commandMessages = Seq(TextMessage("{\"op\":\"ping\"}"), TextMessage("{\"op\":\"unconfirmed_sub\"}"))
val helloSource: Source[Message, NotUsed] = Source(commandMessages.to[scala.collection.immutable.Seq])
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)
val (upgradeResponse, closed) =
Http().singleWebSocketRequest(WebSocketRequest("wss://ws.blockchain.info/inv"), flow)
val connected = upgradeResponse.map { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Done
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
connected.onComplete(println)
closed.foreach(_ => println("closed"))
}
when using the stream version from http://doc.akka.io/docs/akka-http/10.0.0/scala/http/client-side/websocket-support.html#websocketclientflow , changed as follows, again the result it turns out twice as much:
{"op":"pong"}
{"op":"pong"}
See code:
object WebSocketClientFlow extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
}
val commandMessages = Seq(TextMessage("{\"op\":\"ping\"}"), TextMessage("{\"op\":\"unconfirmed_sub\"}"))
val outgoing: Source[Message, NotUsed] = Source(commandMessages.to[scala.collection.immutable.Seq])
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://ws.blockchain.info/inv"))
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right)
.toMat(incoming)(Keep.both)
.run()
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
connected.onComplete(println)
closed.foreach(_ => {
println("closed")
system.terminate
})
}
How can I achieve the same result as in chrome
- display print of signed stream
- in the best case, periodically send updates (ping instructions), as indicated in https://blockchain.info/api/api via
{"op":"ping"}messages
. akka 2.4.17 akka-http 10.0.5