Scala actor inefficiency problem

Let me start by saying that I am new to Scala; However, I find the concurrency acting model interesting, and I tried to give her a chance for a relatively simple application. The problem I'm facing is that although I can get the application to work, the result is much less efficient (in terms of real time, CPU time and memory usage) than the equivalent Java based solution that uses threads that display messages with an ArrayBlockingQueue. I would like to understand why. I suspect that this is probably Scala's lack of knowledge, and that I am causing all the inefficiencies, but after several attempts to remake the application without success, I decided to turn to the community for help.

My problem is this: I have a gzipped file with many lines in the format:

SomeID comma_separated_list_of_values

For instance:

1234 12.45.82

I would like to analyze each line and get a total count of the number of occurrences of each value in a comma separated list.

This file can be quite large (several GB compressed), but the number of unique values ​​for the file is quite small (no more than 500). I figured it would be a good opportunity to try writing an actor-based Scala collaborative application. My solution includes the main driver that creates the actors parser pool. The main driver then reads the lines from stdin, passes the line to the actor, who analyzes the line and saves the local count of values. When the main driver read the last line, it sends a message to each player, indicating that all lines have been read. When an actor receives a "done" message, they transfer their bills to an aggregator, which summarizes the counts from all participants. After the counters of all parsers have been aggregated, the main driver prints statistics.

Problem: The main problem I am facing is the incredible amount of inefficiency of this application. It uses much more CPU and much more memory than the "equivalent" Java application, which uses threads and ArrayBlockingQueue. To put this in perspective, here are some statistics I have collected for a 10 millionth test input file:

Scala 1 Actor (parser):

real 9m22.297s user 235m31.070s sys 21m51.420s 

Java 1 Thread (parser):

  real 1m48.275s user 1m58.630s sys 0m33.540s 

Scala 5 Actors:

  real 2m25.267s user 63m0.730s sys 3m17.950s 

Java 5 Themes:

  real 0m24.961s user 1m52.650s sys 0m20.920s 

In addition, top reports that the Scala application has a resident memory size of about 10x. Therefore, we are talking about orders of magnitude more CPU and memory here are several orders of magnitude worse, and I just can’t understand what causes this. Is this a GC problem, or am I somehow creating a lot more copies of objects than I understand?

Additional information that may or may not be important:

  • The Scala application is wrapped in a Java class so that I can deliver a stand-alone executable JAR file (I do not have Scala on every machine I can run this application on).
  • The application is invoked as follows: gunzip -c gzFilename | java -jar StatParser.jar

Here is the code:

Main driver:

 import scala.actors.Actor._ import scala.collection.{ immutable, mutable } import scala.io.Source class StatCollector (numParsers : Int ) { private val parsers = new mutable.ArrayBuffer[StatParser]() private val aggregator = new StatAggregator() def generateParsers { for ( i <- 1 to numParsers ) { val parser = new StatParser( i, aggregator ) parser.start parsers += parser } } def readStdin { var nextParserIdx = 0 var lineNo = 1 for ( line <- Source.stdin.getLines() ) { parsers( nextParserIdx ) ! line nextParserIdx += 1 if ( nextParserIdx >= numParsers ) { nextParserIdx = 0 } lineNo += 1 } } def informParsers { for ( parser <- parsers ) { parser ! true } } def printCounts { val countMap = aggregator.getCounts() println( "ID,Count" ) /* for ( key <- countMap.keySet ) { println( key + "," + countMap.getOrElse( key, 0 ) ) //println( "Campaign '" + key + "': " + countMap.getOrElse( key, 0 ) ) } */ countMap.toList.sorted foreach { case (key, value) => println( key + "," + value ) } } def processFromStdIn { aggregator.start generateParsers readStdin process } def process { informParsers var completedParserCount = aggregator.getNumParsersAggregated while ( completedParserCount < numParsers ) { Thread.sleep( 250 ) completedParserCount = aggregator.getNumParsersAggregated } printCounts } } 

Actor Parser:

 import scala.actors.Actor import collection.mutable.HashMap import scala.util.matching class StatParser( val id: Int, val aggregator: StatAggregator ) extends Actor { private var countMap = new HashMap[String, Int]() private val sep1 = "\t" private val sep2 = "," def getCounts(): HashMap[String, Int] = { return countMap } def act() { loop { react { case line: String => { val idx = line.indexOf( sep1 ) var currentCount = 0 if ( idx > 0 ) { val tokens = line.substring( idx + 1 ).split( sep2 ) for ( token <- tokens ) { if ( !token.equals( "" ) ) { currentCount = countMap.getOrElse( token, 0 ) countMap( token ) = ( 1 + currentCount ) } } } } case doneProcessing: Boolean => { if ( doneProcessing ) { // Send my stats to Aggregator aggregator ! this } } } } } } 

Aggregator Actor:

 import scala.actors.Actor import collection.mutable.HashMap class StatAggregator extends Actor { private var countMap = new HashMap[String, Int]() private var parsersAggregated = 0 def act() { loop { react { case parser: StatParser => { val cm = parser.getCounts() for ( key <- cm.keySet ) { val currentCount = countMap.getOrElse( key, 0 ) val incAmt = cm.getOrElse( key, 0 ) countMap( key ) = ( currentCount + incAmt ) } parsersAggregated += 1 } } } } def getNumParsersAggregated: Int = { return parsersAggregated } def getCounts(): HashMap[String, Int] = { return countMap } } 

Any help that can be offered in understanding what is going on here would be greatly appreciated.

Thanks in advance!

---- Edit ---

Since many people answered and asked for Java code, here is a simple Java application that I created for comparison. I understand that this is not great Java code, but when I saw the performance of the Scala application, I just weighed something quickly to see how the Java Thread-based implementation would execute as the baseline:

Topic analysis:

 import java.util.Hashtable; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; public class JStatParser extends Thread { private ArrayBlockingQueue<String> queue; private Map<String, Integer> countMap; private boolean done; public JStatParser( ArrayBlockingQueue<String> q ) { super( ); queue = q; countMap = new Hashtable<String, Integer>( ); done = false; } public Map<String, Integer> getCountMap( ) { return countMap; } public void alldone( ) { done = true; } @Override public void run( ) { String line = null; while( !done || queue.size( ) > 0 ) { try { // line = queue.take( ); line = queue.poll( 100, TimeUnit.MILLISECONDS ); if( line != null ) { int idx = line.indexOf( "\t" ) + 1; for( String token : line.substring( idx ).split( "," ) ) { if( !token.equals( "" ) ) { if( countMap.containsKey( token ) ) { Integer currentCount = countMap.get( token ); currentCount++; countMap.put( token, currentCount ); } else { countMap.put( token, new Integer( 1 ) ); } } } } } catch( InterruptedException e ) { // TODO Auto-generated catch block System.err.println( "Failed to get something off the queue: " + e.getMessage( ) ); e.printStackTrace( ); } } } } 

Driver:

 import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.Collections; import java.util.Hashtable; import java.util.List; import java.util.Map; import java.util.TreeSet; import java.util.concurrent.ArrayBlockingQueue; public class JPS { public static void main( String[] args ) { if( args.length <= 0 || args.length > 2 || args[0].equals( "-?" ) ) { System.err.println( "Usage: JPS [filename]" ); System.exit( -1 ); } int numParsers = Integer.parseInt( args[0] ); ArrayBlockingQueue<String> q = new ArrayBlockingQueue<String>( 1000 ); List<JStatParser> parsers = new ArrayList<JStatParser>( ); BufferedReader reader = null; try { if( args.length == 2 ) { reader = new BufferedReader( new FileReader( args[1] ) ); } else { reader = new BufferedReader( new InputStreamReader( System.in ) ); } for( int i = 0; i < numParsers; i++ ) { JStatParser parser = new JStatParser( q ); parser.start( ); parsers.add( parser ); } String line = null; while( (line = reader.readLine( )) != null ) { try { q.put( line ); } catch( InterruptedException e ) { // TODO Auto-generated catch block System.err.println( "Failed to add line to q: " + e.getMessage( ) ); e.printStackTrace( ); } } // At this point, we've put everything on the queue, now we just // need to wait for it to be processed. while( q.size( ) > 0 ) { try { Thread.sleep( 250 ); } catch( InterruptedException e ) { } } Map<String,Integer> countMap = new Hashtable<String,Integer>( ); for( JStatParser jsp : parsers ) { jsp.alldone( ); Map<String,Integer> cm = jsp.getCountMap( ); for( String key : cm.keySet( ) ) { if( countMap.containsKey( key )) { Integer currentCount = countMap.get( key ); currentCount += cm.get( key ); countMap.put( key, currentCount ); } else { countMap.put( key, cm.get( key ) ); } } } System.out.println( "ID,Count" ); for( String key : new TreeSet<String>(countMap.keySet( )) ) { System.out.println( key + "," + countMap.get( key ) ); } for( JStatParser parser : parsers ) { try { parser.join( 100 ); } catch( InterruptedException e ) { // TODO Auto-generated catch block e.printStackTrace(); } } System.exit( 0 ); } catch( IOException e ) { System.err.println( "Caught exception: " + e.getMessage( ) ); e.printStackTrace( ); } } } 
+6
source share
1 answer

I'm not sure this is a good test case for actors. Firstly, there is practically no interaction between the actors. This is a simple map / reduce that causes parallelism, not concurrency.

The overhead for the actors is also quite heavy, and I don't know how many actual threads stand out. Depending on how many processors you have, you may have fewer threads than a Java program, which seems to be the case, given that the acceleration is 4x instead of 5x.

And the way you wrote the actors is optimized for idle actors, such as a situation in which you have hundreds or thousands or actors, but only a few of them do the actual work at any time. If you wrote actors with while / receive instead of loop / react , they will work better.

Now actors will simplify the distribution of applications on many computers, except that you have violated one of the principles of actors: you call methods on the actor object. You should never do this with actors, and, in fact, Akka is stopping you from doing this. A more acting way to do this would be for the aggregator to ask each actor about their key sets, calculate their union, and then for each key, ask all participants to send their account for this key.

I'm not sure, however, that the actor’s overhead is what you see. You have not provided any information about the Java implementation, but I suppose you are using mutable maps and perhaps even one parallel mutable map - a completely different implementation than what you are doing in Scala.

There is also no information on how the file is read (such a large file may have buffering problems) or how it is parsed in Java. Since most of the work is reading and analyzing the file, not counting tokens, differences in implementation can easily overcome any other problem.

Finally, in terms of resident memory size, Scala has a library of 9 MB in size (in addition to what the JVM offers), which may be what you see. Of course, if you use one parallel card in Java with 6 immutable cards in Scala, this will certainly have a big impact on memory usage patterns.

+7
source

Source: https://habr.com/ru/post/921685/


All Articles