Websocket proxy using Play 2.6 and akka streams

I am trying to create a simple proxy to connect to Websocket using the Play and akka streams. Stream traffic is as follows:

(Client) request -> -> request (Server) Proxy (Client) response <- <- response (Server) 

I gave the following code after the following examples:

 def socket = WebSocket.accept[String, String] { request => val uuid = UUID.randomUUID().toString // wsOut - actor that deals with incoming websocket frame from the Client // wsIn - publisher of the frame for the Server val (wsOut: ActorRef, wsIn: Publisher[String]) = { val source: Source[String, ActorRef] = Source.actorRef[String](10, OverflowStrategy.dropTail) val sink: Sink[String, Publisher[String]] = Sink.asPublisher(fanout = false) source.toMat(sink)(Keep.both).run() } // sink that deals with the incoming messages from the Server val serverIncoming: Sink[Message, Future[Done]] = Sink.foreach[Message] { case message: TextMessage.Strict => println("The server has sent: " + message.text) } // source for sending a message over the WebSocket val serverOutgoing = Source.fromPublisher(wsIn).map(TextMessage(_)) // flow to use (note: not re-usable!) val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://0.0.0.0:6000")) // the materialized value is a tuple with // upgradeResponse is a Future[WebSocketUpgradeResponse] that // completes or fails when the connection succeeds or fails // and closed is a Future[Done] with the stream completion from the incoming sink val (upgradeResponse, closed) = serverOutgoing .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse] .toMat(serverIncoming)(Keep.both) // also keep the Future[Done] .run() // just like a regular http request we can access response status which is available via upgrade.response.status // status code 101 (Switching Protocols) indicates that server support WebSockets val connected = upgradeResponse.flatMap { upgrade => if (upgrade.response.status == StatusCodes.SwitchingProtocols) { Future.successful(Done) } else { throw new RuntimeException(s"Connection failed: ${upgrade.response.status}") } } // in a real application you would not side effect here connected.onComplete(println) closed.foreach(_ => println("closed")) val actor = system.actorOf(WebSocketProxyActor.props(wsOut, uuid)) val finalFlow = { val sink = Sink.actorRef(actor, akka.actor.Status.Success(())) val source = Source.maybe[String] // what the client receives. How to connect with the serverIncoming sink ??? Flow.fromSinkAndSource(sink, source) } finalFlow 

With this code, traffic flows from the client to the proxy to the server, back to the proxy and it. He does not reach the Client. How can i fix this? I think I need to somehow connect the serverIncoming shell to source in finalFlow , but I cannot figure out how to do this ...

Or do I completely disagree with this approach? Is it better to use Bidiflow or Graph ? I am new to aka streams and am still trying to figure things out.

+5
source share
3 answers

The following seems to work. Note. I implemented both a server socket and a proxy socket in one controller, but you can split them or deploy the same controller in separate instances. The ws URL for the "top" service will need to be updated in both cases.

 package controllers import javax.inject._ import akka.actor.{Actor, ActorRef, ActorSystem, Props} import akka.http.scaladsl.Http import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest, WebSocketUpgradeResponse} import akka.stream.Materializer import akka.stream.scaladsl.Flow import play.api.libs.streams.ActorFlow import play.api.mvc._ import scala.concurrent.{ExecutionContext, Future} import scala.language.postfixOps @Singleton class SomeController @Inject()(implicit exec: ExecutionContext, actorSystem: ActorSystem, materializer: Materializer) extends Controller { /*--- proxy ---*/ def websocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] = Http().webSocketClientFlow(WebSocketRequest("ws://localhost:9000/upper-socket")) def proxySocket: WebSocket = WebSocket.accept[String, String] { _ => Flow[String].map(s => TextMessage(s)) .via(websocketFlow) .map(_.asTextMessage.getStrictText) } /*--- server ---*/ class UpperService(socket: ActorRef) extends Actor { override def receive: Receive = { case s: String => socket ! s.toUpperCase() case _ => } } object UpperService { def props(socket: ActorRef): Props = Props(new UpperService(socket)) } def upperSocket: WebSocket = WebSocket.accept[String, String] { _ => ActorFlow.actorRef(out => UpperService.props(out)) } } 

You will need routes that need to be configured as follows:

 GET /upper-socket controllers.SomeController.upperSocket GET /proxy-socket controllers.SomeController.proxySocket 

You can test by sending a line to ws: // localhost: 9000 / proxy-socket. The response will be an uppercase string.

After 1 minute of inactivity there will be a timeout:

 akka.stream.scaladsl.TcpIdleTimeoutException: TCP idle-timeout encountered on connection to [localhost:9000], no bytes passed in the last 1 minute 

But see: http://doc.akka.io/docs/akka-http/current/scala/http/common/timeouts.html on how to configure this.

+3
source

The proxy server must provide two streams (proxy stream A / B):

 (Client) request -> Proxy Flow A -> request (Server) (Client) response <- Proxy Flow B <- response (Server) 

One of the options for implementing such a proxy stream is to use ActorSubscriber and SourceQueue:

 class Subscriber[T](proxy: ActorRef) extends ActorSubscriber { private var queue = Option.empty[SourceQueueWithComplete[T]] def receive = { case Attach(sourceQueue) => queue = Some(sourceQueue) case msg: T => // wait until queue attached and pass forward all msgs to queue and the proxy actor } } def proxyFlow[T](proxy: ActorRef): Flow[T, ActorRef] = { val sink = Sink.actorSubscriber(Props(new Subscriber[T](proxy))) val source = Source.queue[T](...) Flow.fromSinkAndSourceMat(sink, source){ (ref, queue) => ref ! Attach(queue) ref } } 

Then you can build a customer flow, for example:

 val proxy = actorOf(...) val requestFlow = proxyFlow[Request](proxy) val responseFlow = proxyFlow[Response](proxy) val finalFlow: Flow[Request, Response] = requestFlow.via(webSocketFlow).via(responseFlow) 
+2
source

First of all, you need to import akka :

 import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.http.scaladsl.Http import akka.http.scaladsl.model.ws.WebSocketRequest import akka.http.scaladsl.model.ws.Message import akka.http.scaladsl.model.HttpRequest import akka.http.scaladsl.model.HttpResponse import akka.stream.scaladsl.Flow import akka.http.scaladsl.server.Directives.{ extractUpgradeToWebSocket, complete } 

This is an example of an App that creates a WebSocket proxy, binding to 0.0.0.0 on port 80 , proxying to ws://echo.websocket.org :

 object WebSocketProxy extends App { implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() private[this] def manipulateFlow: Flow[Message, Message, akka.NotUsed] = ??? private[this] def webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://echo.websocket.org")) private[this] val route: Flow[HttpRequest, HttpResponse, Any] = extractUpgradeToWebSocket { upgrade => val webSocketFlowProxy = manipulateFlow via webSocketFlow val handleWebSocketProxy = upgrade.handleMessages(webSocketFlowProxy) complete(handleWebSocketProxy) } private[this] val proxyBindingFuture = Http().bindAndHandle(route, "0.0.0.0", 80) println(s"Server online\nPress RETURN to stop...") Console.readLine() } 

You need to adapt it for play and for your application structure.

Notes:

  • remember to cancel proxyBindingFuture and terminate the system during production;
  • you need to manipulateFlow only if you want to manipulate messages.
+2
source

Source: https://habr.com/ru/post/1266632/


All Articles