I'm currently trying to prepare a set of Word2Vec vectors in UMBC Webbase Corpus (about 30 GB of text in 400 files).
I often run into memory situations even on 100 GB plus machines. I run Spark in the application itself. I tried to fine tune a little, but I can not perform this operation on more than 10 GB of text data. The obvious bottleneck of my implementation is the union of previously computed RDDs from which an exception is thrown from memory.
Perhaps you have experience to implement more efficient work with memory than this:
object SparkJobs { val conf = new SparkConf() .setAppName("TestApp") .setMaster("local[*]") .set("spark.executor.memory", "100g") .set("spark.rdd.compress", "true") val sc = new SparkContext(conf) def trainBasedOnWebBaseFiles(path: String): Unit = { val folder: File = new File(path) val files: ParSeq[File] = folder.listFiles(new TxtFileFilter).toIndexedSeq.par var i = 0; val props = new Properties(); props.setProperty("annotators", "tokenize, ssplit"); props.setProperty("nthreads","2") val pipeline = new StanfordCoreNLP(props); //preprocess files parallel val training_data_raw: ParSeq[RDD[Seq[String]]] = files.map(file => { //preprocess line of file println(file.getName() +"-" + file.getTotalSpace()) val rdd_lines: Iterator[Option[Seq[String]]] = for (line <- Source.fromFile(file,"utf-8").getLines) yield { //performs some preprocessing like tokenization, stop word filtering etc. processWebBaseLine(pipeline, line) } val filtered_rdd_lines = rdd_lines.filter(line => line.isDefined).map(line => line.get).toList println(s"File $i done") i = i + 1 sc.parallelize(filtered_rdd_lines).persist(StorageLevel.MEMORY_ONLY_SER) }) val rdd_file = sc.union(training_data_raw.seq) val starttime = System.currentTimeMillis() println("Start Training") val word2vec = new Word2Vec() word2vec.setVectorSize(100) val model: Word2VecModel = word2vec.fit(rdd_file) println("Training time: " + (System.currentTimeMillis() - starttime)) ModelUtil.storeWord2VecModel(model, Config.WORD2VEC_MODEL_PATH) }} }
source share