How to implement simple TCP protocol using Akka streams?

I took a hit on implementing a simple TCP protocol for messaging with Akka streams (see below). However, it seems that incoming messages are not processed immediately; that is, in the scenario, when two messages are sent one after another from the client, the first message is printed only after something is sent from the server:

At t=1, on [client] A is entered At t=2, on [client] B is entered At t=3, on [server] Z is entered At t=4, on [server] A is printed At t=5, on [server] Y is entered At t=6, on [server] B is printed 

What I expected / want to see:

 At t=1, on [client] A is entered At t=2, on [server] A is printed At t=3, on [client] B is entered At t=4, on [server] B is printed At t=5, on [server] Z is entered At t=6, on [client] Z is printed At t=7, on [server] Y is entered At t=8, on [client] Y is printed 

What am I missing? Perhaps I need to somehow make the sinks on both ends impatient? Or each receiver is somehow blocked by the corresponding source (while the source is waiting for input from the command line)?

 import java.nio.charset.StandardCharsets.UTF_8 import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.{BidiFlow, Flow, Sink, Source, Tcp} import akka.util.ByteString import com.typesafe.config.ConfigFactory import scala.io.StdIn object AkkaStreamTcpChatter extends App { implicit val system = ActorSystem("akka-stream-tcp-chatter", ConfigFactory.defaultReference()) implicit val materializer = ActorMaterializer() type Message = String val (host, port) = ("localhost", 46235) val deserialize:ByteString => Message = _.utf8String val serialize:Message => ByteString = message => ByteString(message getBytes UTF_8) val incoming:Flow[ByteString, Message, _] = Flow fromFunction deserialize val outgoing:Flow[Message, ByteString, _] = Flow fromFunction serialize val protocol = BidiFlow.fromFlows(incoming, outgoing) def prompt(s:String):Source[Message, _] = Source fromIterator { () => Iterator.continually(StdIn readLine s"[$s]> ") } val print:Sink[Message, _] = Sink foreach println args.headOption foreach { case "server" => server() case "client" => client() } def server():Unit = Tcp() .bind(host, port) .runForeach { _ .flow .join(protocol) .runWith(prompt("S"), print) } def client():Unit = Tcp() .outgoingConnection(host, port) .join(protocol) .runWith(prompt("C"), print) } 
+5
source share
1 answer

I think the problem is that Akka Stream is merging operators . This means that full stream processing works on a single actor. When it blocks the reading of your messages, it cannot print anything.

The solution would be to add an asynchronous border after your source. See the example below.

 def server(): Unit = Tcp() .bind(host, port) .runForeach { _ .flow .join(protocol) .runWith(prompt("S").async, print) // note .async here } def client(): Unit = Tcp() .outgoingConnection(host, port) .join(protocol).async .runWith(prompt("C").async, print) // note .async here 

When you add an asynchronous border, then merging does not happen along the border, and prompt works on another actor, so it does not block print from showing anything.

+5
source

Source: https://habr.com/ru/post/1245836/


All Articles