How to get file name with sc.textFile spark?

I am reading the file directory using the following code:

val data = sc.textFile("/mySource/dir1/*")

now my datardd contains all the lines of all the files in the directory (right?)

Now I want to add a column to each row with the name of the source files, how can I do this?

Other parameters that I tried to use is to use wholeTextFile, but I keep getting exceptions from memory. 5 servers 24 cores 24 GB (executor-core 5-executor-memory 5G) any ideas?

+4
source share
1 answer

You can use this code. I tested it with Spark 1.4 and 1.5.

inputSplit , iterator, mapPartitionsWithInputSplit NewHadoopRDD

import org.apache.hadoop.mapreduce.lib.input.{FileSplit, TextInputFormat}
import org.apache.spark.rdd.{NewHadoopRDD}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text

val sc = new SparkContext(new SparkConf().setMaster("local"))

val fc = classOf[TextInputFormat]
val kc = classOf[LongWritable]
val vc = classOf[Text]

val path :String = "file:///home/user/test"
val text = sc.newAPIHadoopFile(path, fc ,kc, vc, sc.hadoopConfiguration)

val linesWithFileNames = text.asInstanceOf[NewHadoopRDD[LongWritable, Text]]
           .mapPartitionsWithInputSplit((inputSplit, iterator) => {
  val file = inputSplit.asInstanceOf[FileSplit]
  iterator.map(tup => (file.getPath, tup._2))
  }
)

linesWithFileNames.foreach(println)
+7

Source: https://habr.com/ru/post/1620380/


All Articles