High CPU utilization with context switches in Akka app

I support and develop two Akka Scala applications that interact with a serial device to collect sensor information. The main difference between the two is that one application (the application for my CO2 sensor) uses a 1% processor, while the other My Power sensor application uses 250% CPU. This happens both on a Linux machine (Raspberry Pi 3) and on my Windows PC. Basically, the main difference is that CO2 uses the serial library ( http://fazecast.imtqy.com/jSerialComm/ ), while the power sensor application goes through the middleware layer to convert In / OutputStreams from Serial libraries in Akka Source / Sink as such:

val port = SerialPort.getCommPort(comPort) port.setBaudRate(baudRate) port.setFlowControl(flowControl) port.setComPortParameters(baudRate, dataBits, stopBits, parity) port.setComPortTimeouts(timeoutMode, timeout, timeout) val isOpen = port.openPort() if(!isOpen) { error(s"Port $comPort could not opened. Use the following documentation for troubleshooting: https://github.com/Fazecast/jSerialComm/wiki/Troubleshooting") throw new Exception("Port could not be opened") } (reactive.streamSource(port.getInputStream), reactive.streamSink(port.getOutputStream)) 

When I saw this big CPU usage, I immediately hit the profiler (VisualVM) against it, which told me the following: Profiler screenshot

After searching in Unsafe.park, I found the following answer: https://stackoverflow.com/a/212960/167/ Using the information, I checked the number of WITH context switches and without my Power sensor application, and the results were very clear regarding the root cause of the problem :

 pi@dex :~ $ vmstat 1 procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu----- rb swpd free buff cache si so bi bo in cs us sy id wa st 10 0 32692 80144 71228 264356 0 0 0 5 7 8 38 5 55 2 0 1 0 32692 80176 71228 264356 0 0 0 76 12932 18856 59 6 35 0 0 1 0 32692 80208 71228 264356 0 0 0 0 14111 20570 60 8 32 0 0 1 0 32692 80208 71228 264356 0 0 0 0 13186 16095 65 6 29 0 0 1 0 32692 80176 71228 264356 0 0 0 0 14008 23449 56 6 38 0 0 3 0 32692 80208 71228 264356 0 0 0 0 13528 17783 65 6 29 0 0 1 0 32692 80208 71228 264356 0 0 0 28 12960 16588 63 6 31 0 0 pi@dex :~ $ vmstat 1 procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu----- rb swpd free buff cache si so bi bo in cs us sy id wa st 1 0 32692 147320 71228 264332 0 0 0 5 7 8 38 5 55 2 0 0 0 32692 147296 71228 264332 0 0 0 84 963 1366 0 0 98 2 0 0 0 32692 147296 71228 264332 0 0 0 0 962 1347 1 0 99 0 0 0 0 32692 147296 71228 264332 0 0 0 0 947 1318 1 0 99 0 0 

As you can see, the number of context switches decreased by ~ 12000 seconds just by killing my application. I continued to check exactly which threads did this, and it seems that Akka really wants to do something: Shaper Threads

Like a comment here, and another SO question aimed at setting up Akka's parallelism settings. I added the following to my application.conf - no result.

 akka { log-config-on-start = "on" actor{ default-dispatcher { # Dispatcher is the name of the event-based dispatcher type = Dispatcher # What kind of ExecutionService to use executor = "fork-join-executor" # Configuration for the fork join pool default-executor { fallback = "fork-join-executor" } fork-join-executor { # Min number of threads to cap factor-based parallelism number to parallelism-min = 1 # Parallelism (threads) ... ceil(available processors * factor) parallelism-factor = 1.0 # Max number of threads to cap factor-based parallelism number to parallelism-max = 1 } # Throughput defines the maximum number of messages to be # processed per actor before the thread jumps to the next actor. # Set to 1 for as fair as possible. throughput = 1 } } stream{ default-blocking-io-dispatcher { type = PinnedDispatcher executor = "fork-join-executor" throughput = 1 thread-pool-executor { core-pool-size-min = 1 core-pool-size-factor = 1.0 core-pool-size-max = 1 } fork-join-executor { parallelism-min = 1 parallelism-factor = 1.0 parallelism-max = 1 } } } } 

This seems to improve CPU utilization (100% → 65%), but nonetheless, CPU utilization is unreasonably large.

UPDATE 21-11 -16 It would seem that the problem is inside my schedule. When you do not complete the schedule, the processor load immediately drops to a normal level. The schedule is as follows:

 val streamGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => import GraphDSL.Implicits._ val responsePacketSource = serialSource .via(Framing.delimiter(ByteString(frameDelimiter), maxFrameLength, allowTruncation = true)) .via(cleanPacket) .via(printOutput("Received: ",debug(_))) .via(byteStringToResponse) val packetSink = pushSource .via(throttle(throttle)) val zipRequestStickResponse = builder.add(Zip[RequestPacket, ResponsePacket]) val broadcastRequest = builder.add(Broadcast[RequestPacket](2)) val broadcastResponse = builder.add(Broadcast[ResponsePacket](2)) packetSink ~> broadcastRequest.in broadcastRequest.out(0) ~> makePacket ~> printOutput("Sent: ",debug(_)) ~> serialSink broadcastRequest.out(1) ~> zipRequestStickResponse.in0 responsePacketSource ~> broadcastResponse.in broadcastResponse.out(0).filter(isStickAck) ~> zipRequestStickResponse.in1 broadcastResponse.out(1).filter(!isStickAck(_)).map (al => { val e = completeRequest(al) debug(s"Sinking: $e") e }) ~> Sink.ignore zipRequestStickResponse.out.map { case(request, stickResponse) => debug(s"Mapping: request=$request, stickResponse=$stickResponse") pendingPackets += stickResponse.sequenceNumber -> request request.stickResponse trySuccess stickResponse } ~> Sink.ignore ClosedShape }) streamGraph.run() 

When you remove filters from broadcastResponse, the processor load drops to normal. This makes me think that zip never happens, and therefore the chart goes into an incorrect state.

+5
source share
1 answer

The problem is that the Fazecast jSerialComm library has several different timeout modes.

 static final public int TIMEOUT_NONBLOCKING = 0x00000000; static final public int TIMEOUT_READ_SEMI_BLOCKING = 0x00000001; static final public int TIMEOUT_WRITE_SEMI_BLOCKING = 0x00000010; static final public int TIMEOUT_READ_BLOCKING = 0x00000100; static final public int TIMEOUT_WRITE_BLOCKING = 0x00001000; static final public int TIMEOUT_SCANNER = 0x00010000; 

Using the non-blocking read() method ( TIMEOUT_NONBLOCKING ) results in very high CPU usage in conjunction with the Akka InputStreamPublisher stream. To prevent this, simply use TIMEOUT_READ_SEMI_BLOCKING or TIMEOUT_READ_BLOCKING .

+3
source

Source: https://habr.com/ru/post/1259962/


All Articles