Efficient memory way to combine RDD sequence from files in Apache Spark

I'm currently trying to prepare a set of Word2Vec vectors in UMBC Webbase Corpus (about 30 GB of text in 400 files).

I often run into memory situations even on 100 GB plus machines. I run Spark in the application itself. I tried to fine tune a little, but I can not perform this operation on more than 10 GB of text data. The obvious bottleneck of my implementation is the union of previously computed RDDs from which an exception is thrown from memory.

Perhaps you have experience to implement more efficient work with memory than this:

object SparkJobs { val conf = new SparkConf() .setAppName("TestApp") .setMaster("local[*]") .set("spark.executor.memory", "100g") .set("spark.rdd.compress", "true") val sc = new SparkContext(conf) def trainBasedOnWebBaseFiles(path: String): Unit = { val folder: File = new File(path) val files: ParSeq[File] = folder.listFiles(new TxtFileFilter).toIndexedSeq.par var i = 0; val props = new Properties(); props.setProperty("annotators", "tokenize, ssplit"); props.setProperty("nthreads","2") val pipeline = new StanfordCoreNLP(props); //preprocess files parallel val training_data_raw: ParSeq[RDD[Seq[String]]] = files.map(file => { //preprocess line of file println(file.getName() +"-" + file.getTotalSpace()) val rdd_lines: Iterator[Option[Seq[String]]] = for (line <- Source.fromFile(file,"utf-8").getLines) yield { //performs some preprocessing like tokenization, stop word filtering etc. processWebBaseLine(pipeline, line) } val filtered_rdd_lines = rdd_lines.filter(line => line.isDefined).map(line => line.get).toList println(s"File $i done") i = i + 1 sc.parallelize(filtered_rdd_lines).persist(StorageLevel.MEMORY_ONLY_SER) }) val rdd_file = sc.union(training_data_raw.seq) val starttime = System.currentTimeMillis() println("Start Training") val word2vec = new Word2Vec() word2vec.setVectorSize(100) val model: Word2VecModel = word2vec.fit(rdd_file) println("Training time: " + (System.currentTimeMillis() - starttime)) ModelUtil.storeWord2VecModel(model, Config.WORD2VEC_MODEL_PATH) }} } 
+6
source share
2 answers

As Sarresh points out in the comments, this is probably too much data for one machine. Use more cars. Usually, we see that working with a 1 GB file requires 20 GB of memory. According to this (extremely crude) estimate, you will need 600 MB of memory to enter 30 GB. (You can get a more accurate estimate by downloading some of the data.)

As a more general comment, I would advise you to avoid using rdd.union and sc.parallelize . Instead, use a sc.textFile to load all files into a single RDD.

+1
source

Have you tried to get word2vec from a smaller package? I tell you that this led me to run the implementation of the word2vec spark on a much smaller one, and I had problems with this because there is this problem: http://mail-archives.apache.org/mod_mbox/ spark-issues / 201412.mbox /% 3CJIRA.12761684.1418621192000.36769.1418759475999@Atlassian.JIRA % 3E

So, for my use case, this problem made the spark2nec word2vec implementation a little useless. So I used the spark to massage my body, but not to actually get the vectors.

  • Like the other suggested ones, avoid calling rdd.union .
  • I also think that .toList will probably collect every row from RDD and assemble it on your Driver Machine (the one used to send the job), which is probably why you get it from memory. You should completely avoid turning RDD into a list!
0
source

Source: https://habr.com/ru/post/982038/


All Articles