I have a set of log files that I would like to read in RDD. These log files are all gzip compressed files, and the file names are the date stamp.
I used sc.wholeTextFiles()to read in files, and it looks like I'm facing Java heap memory issues. To isolate the problem, I decided to run it against one file on the same machine as a test case.
I got the file from here:
http://dumps.wikimedia.org/other/pagecounts-raw/
Here are the file sizes, compressed and uncompressed versions:
myuser@fembuntu$ ls -ltr pagecounts-20090505-180000*
-rw-rw-r-- 1 myuser myuser 65170192 Sep 20 2009 pagecounts-20090505-180000.gz
-rw-rw-r-- 1 myuser myuser 233007266 Jan 22 00:23 pagecounts-20090505-180000.txt
and the available memory on the device is as follows:
myuser@fembuntu:~$ free -tm
total used free shared buffers cached
Mem: 4856 3018 1838 123 27 407
-/+ buffers/cache: 2583 2273
Swap: 5080 849 4231
Total: 9937 3867 6069
So, I launched the spark shell, providing the executor with 2G memory:
$ spark-shell --executor-memory 2G
scala> val pc_loc = "file:///home/myuser/data/pagecounts"
scala> val filename="/pagecounts-20090505-180000.gz"
filename: String = /pagecounts-20090505-180000.gz
Here I read the data through sc.textFile()and display the 1st 2 lines:
scala> var rdd=sc.textFile(pc_loc + filename)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:31
scala> rdd.take(2)
res0: Array[String] = Array(aa.b Help:Books 1 5263, aa.b Main_Page 1 5416)
It works great.
sc.wholeTextFiles() flatMapValues(), RDD, -. RDD, sc.textFile(). .
scala> val pair_rdd=sc.wholeTextFiles(pc_loc + filename).flatMapValues(y => y.split("\n"))
pair_rdd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[4] at flatMapValues at <console>:31
:
scala> pair_rdd.take(2)
16/01/22 01:13:36 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapCharBuffer.<init>(HeapCharBuffer.java:57)
at java.nio.CharBuffer.allocate(CharBuffer.java:335)
at java.nio.charset.CharsetDecoder.decode(CharsetDecoder.java:795)
at org.apache.hadoop.io.Text.decode(Text.java:412)
- , ? flatMapValues , , Java , ?