I'm just starting out at Spark and Scala
I have a directory with several files in it. I successfully upload them using
sc.wholeTextFiles(directory)
Now I want to go one level up. Actually, I have a directory containing auxiliary directories containing files. My goal is to get RDD[(String,String)]it so that I can move forward where it RDDrepresents the name and contents of the file.
I tried the following:
val listOfFolders = getListOfSubDirectories(rootFolder)
val input = listOfFolders.map(directory => sc.wholeTextFiles(directory))
but i got Seq[RDD[(String,String)]]
how to convert this Seqto RDD[(String,String)]?
Or maybe I'm not doing everything right, and should I try a different approach?
Edit : code added
val rootFolderHDFS = "hdfs://****/"
val hdfsURI = "hdfs://****/**/"
val listOfFoldersHDFS = ListDirectoryContents.list(hdfsURI,rootFolderHDFS)
val inputHDFS = listOfFoldersHDFS.map(directory => sc.wholeTextFiles(directory))
val init = sc.parallelize(Array[(String, String)]())
val inputHDFS2 = inputHDFS.foldRight(init)((rdd1,rdd2) => rdd2 ++ rdd1)
println(inputHDFS2.count)
source
share