Spark - How to reduce shuffle size JavaPairRDD <Integer, Integer []>?
I have a JavaPairRDD<Integer, Integer[]> on which I want to execute the groupByKey action.
The groupByKey action gives me:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing output location for shuffling
which is practically an OutOfMemory mistake, if I'm not mistaken. This only happens in large data sets (in my case, when the "Shuffle Write" displayed in the web interface is ~ 96 GB).
I have installed:
spark.serializer org.apache.spark.serializer.KryoSerializer
in $SPARK_HOME/conf/spark-defaults.conf , but I'm not sure if Kryo is used to serialize my JavaPairRDD.
Is there anything else I have to do to use Kryo, besides setting this conf parameter, to serialize my RDD? I see in the serialization instructions that:
Spark automatically includes Kryo serializers for many of the commonly used Scala kernel classes, which are covered in the AllScalaRegistrar from the Twitter Chill library.
So what:
Starting with Spark 2.0.0, we internally use the Kryo serializer to shuffle RDDs with simple types, arrays of simple types, or a string type.
I also noticed that when I install spark.serializer in Kryo, Shuffle Write in the web interface increases from ~ 96 GB (with the default serializer) to 243 GB!
EDIT: In a comment, I was asked a question about the logic of my program, in case groupByKey can be replaced with reduceByKey. I do not think this is possible, but here it is all the same:
The entry has the form:
- : bucket id
- value: an integer array of entity identifiers in this bucket
The write operation in random order creates pairs in the form:
- EntityId
- Integer array of all object identifiers in the same bucket (call their neighbors)
The
groupByKeyoperation collects all neighboring arrays for each object, some of which appear several times (in many codes).After the
groupByKeyoperationgroupByKeyI save the weight for each bucket (based on the number of negative identifiers of the entities that it contains), and for each identifier of the neighboring one, I summarize the weights of the buckets to which it belongs.I normalize the counters of each neighboring identifier with a different value (say, given) and emits the top 3 neighborhoods on the entity.
The number of different keys that I receive is about 10 million (about 5 million positive objects and 5 million negatives).
EDIT2 . I tried using Hadoop Writables (VIntWritable and VIntArrayWritable extending ArrayWritable) instead of Integer and Integer [] respectively, but the shuffle size was even bigger than the default JavaSerializer.
Then I increased the value of spark.shuffle.memoryFraction from 0.2 to 0.4 (even if it is deprecated in version 2.1.0, there is no description of what should be used instead) and turned on offHeap memory, and the tab size was reduced by ~ 20 GB Even if it does what the name asks for, I would prefer a more algorithmic solution, or more efficient compression.
Short answer: Use fastutil and possibly increase spark.shuffle.memoryFraction .
More: The problem with this RDD is that Java has to store Object references that consume much more space than primitive types. In this example, I need to save Integer s instead of int values. Java Integer takes 16 bytes, and primitive Java int takes 4 bytes. The Scala int type, on the other hand, is a 32-bit (4-byte) type, like the Java int , so people using Scala may not have come across something similar.
Besides increasing spark.shuffle.memoryFraction to 0.4, another nice solution was to use a fastutil library , as shown in the spark tuning documentation :
The first way to reduce memory consumption is to avoid Java features that add extra overhead, such as pointer-based data structures and wrapper objects. There are several ways to do this. Create your data structures to prefer object arrays and primitive types over standard Java or Scala class classes (like HashMap). The fastutil library provides convenient collection classes for primitive types compatible with the standard Java library.
This allows me to store each element in the int array of my RDD pair as an int type (i.e. using 4 bytes instead of 16 for each element of the array). In my case, I used IntArrayList instead of Integer[] . This significantly reduced the size of the tabs and allowed me to run my program in the cluster. I also used this library in other parts of the code, where I did some temporary Map structures. In general, increasing spark.shuffle.memoryFraction to 0.4 and using the fastutil library, the shuffle size dropped from 96 to 50 GB (!) Using a standard Java serializer (not Kryo).
Alternative: I also tried to sort each int array of rdd pair and save the delta using Hadoop VIntArrayWritable (smaller numbers use less space than large numbers), but it also required VIntWritable and VIntArrayWritable to be registered in Kryo, which ultimately did not save any space. In general, I think that Kryo only makes things work faster, but does not reduce the necessary space, but I'm not sure about that.
I do not mark this answer as accepted, because someone may have a better idea, and because I did not use Kryo in the end, as my OP asked. I hope that reading this will help someone else with the same problem. I will update this answer if I manage to reduce the size of the shuffle.
I think the best approach that can be recommended here (without any specific input knowledge) in the general case is to use the persist API on your input RDD.
As a first step, I will try to call .persist(MEMORY_ONLY_SER) at the input, RDD, to reduce memory usage (although with certain CPU loads, this should not be such a problem for int in your case).
If this is not enough, you can try .persist(MEMORY_AND_DISK_SER) or if there is so much memory in your shuffle that the input data set should be easier in memory .persist(DISK_ONLY) may be an option, but one that will greatly degrade performance.
Still not quite sure what you want to do. However, since you are using groupByKey and saying that there is no way to do this using reduceByKey , this confuses me.
I think you have rdd = (Integer, Integer[]) and you want something like (Integer, Iterable[Integer[]]) , why use groupByKey . Anyway, I'm not very familiar with Java in Spark, but in Scala I would use reduceByKey to avoid shuffling rdd.mapValues(Iterable(_)).reduceByKey(_++_) . Basically, you want to convert the value to an array list, and then combine the list together.