When you create an RDD from a text file, you probably want to map the data to the case class so that you can add an input source at this point:
case class Person(inputPath: String, name: String, age: Int) val inputPath = "hdfs://localhost:9000/tmp/demo-input-data/persons.txt" val rdd = sc.textFile(inputPath).map { l => val tokens = l.split(",") Person(inputPath, tokens(0), tokens(1).trim().toInt) } rdd.collect().foreach(println)
If you do not want to mix business data with metadata:
case class InputSourceMetaData(path: String, size: Long) case class PersonWithMd(name: String, age: Int, metaData: InputSourceMetaData)
and if you are advertising RDD in a DataFrame:
import sqlContext.implicits._ val df = rdd.toDF() df.registerTempTable("x")
you can request it as
sqlContext.sql("select name, metadata from x").show() sqlContext.sql("select name, metadata.path from x").show() sqlContext.sql("select name, metadata.path, metadata.size from x").show()
Update
You can read files in HDFS using org.apache.hadoop.fs.FileSystem.listFiles() recursively.
Given a list of file names in the value of files (the standard Scala collection containing org.apache.hadoop.fs.LocatedFileStatus ), you can create one RDD for each file:
val rdds = files.map { f => val md = InputSourceMetaData(f.getPath.toString, f.getLen) sc.textFile(md.path).map { l => val tokens = l.split(",") PersonWithMd(tokens(0), tokens(1).trim().toInt, md) } }
Now you can reduce the RDD list into one: the function for reduce combines all the RDDs into one:
val rdd = rdds.reduce(_ ++ _) rdd.collect().foreach(println)
This works, but I canโt check if this / works well with large files.