Akka stream asyncBoundary vs mapAsync

I am trying to understand the difference between asyncBoundary and mapAsync . At first glance, I think they should be the same. However, when I run the code, it seems that asyncBoundary performance is faster than mapAsync

Here is the code

 implicit val system = ActorSystem("sourceDemo") implicit val materializer = ActorMaterializer() Source(1 to 100).mapAsync(100)(t => Future {t + 1}).mapAsync(100)(t => Future {t * 2}).map(println).to(Sink.ignore).run() Source(1 to 100).map(_ + 1).withAttributes(Attributes.asyncBoundary).map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run() 

conclusion : an asynchronous border always ends faster than mayAsync.

From the document described about asyncBoundary ( https://doc.akka.io/docs/akka-stream-and-http-experimental/current/scala/stream-flows-and-basics.html ), I see that it works on different CPUs, but mapAsync is multi-threaded using Future. The future is also asynchronous.

Can I learn more about these two APIs?

+5
source share
1 answer

Asynchronous

As you correctly point out, this forces you to insert an asynchronous border between the two steps. In your example

 Source(1 to 100).map(_ + 1).withAttributes(Attributes.asyncBoundary).map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run() 

this practically means that operation + 1 and operation * 2 will be performed by individual participants. This allows the conveyor, since the element goes to step * 2 , at the same time, another element can be entered for stage + 1 . If you do not force the asynchronous border there, the same actor will sequentially perform operations and perform operations on one element before requesting a new one from the upstream.

By the way, your example can be rewritten in a shorter format using the async combinator:

 Source(1 to 100).map(_ + 1).async.map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run() 

mapAsync

This is the stage of parallelizing asynchronous operations. The parallelism coefficient allows you to specify the maximum number of parallel participants to rotate up to serve incoming elements. The results of parallel computing are tracked and highlighted in order at the mapAsync stage.

In your example

 Source(1 to 100).mapAsync(100)(t => Future {t + 1}).mapAsync(100)(t => Future {t * 2}).map(println).to(Sink.ignore).run() 

potentially up to 100 operations + 1 (i.e., all of them) can be executed in parallel, and the results are collected in order. Subsequently, operations up to 100 * 2 could be performed in parallel, and again the results were collected in order and thrown downstream.

In your example, you use fast CPU operations that do not justify the use of mapAsync , since most likely the infrastructure needed for this step is much more expensive than the benefit of doing 100 of these operations simultaneously. mapAsync especially useful when dealing with slow operations related to IO, where the convenience of parallelization is quite convenient.

Also note that mapAsync also by definition introduces an asynchronous border, so you might think of it as an β€œextension” of async , where you can specify parallelism greater than 1.

+5
source

Source: https://habr.com/ru/post/1273381/


All Articles