Spark: how to convert Seq RDD to RDD

I'm just starting out at Spark and Scala

I have a directory with several files in it. I successfully upload them using

sc.wholeTextFiles(directory)

Now I want to go one level up. Actually, I have a directory containing auxiliary directories containing files. My goal is to get RDD[(String,String)]it so that I can move forward where it RDDrepresents the name and contents of the file.

I tried the following:

val listOfFolders = getListOfSubDirectories(rootFolder)
val input = listOfFolders.map(directory => sc.wholeTextFiles(directory))

but i got Seq[RDD[(String,String)]] how to convert this Seqto RDD[(String,String)]?

Or maybe I'm not doing everything right, and should I try a different approach?

Edit : code added

// HADOOP VERSION
val rootFolderHDFS = "hdfs://****/"
val hdfsURI = "hdfs://****/**/"

// returns a list of folders (currently about 800)
val listOfFoldersHDFS = ListDirectoryContents.list(hdfsURI,rootFolderHDFS)
val inputHDFS = listOfFoldersHDFS.map(directory => sc.wholeTextFiles(directory))
// RDD[(String,String)]
//    val inputHDFS2 = inputHDFS.reduceRight((rdd1,rdd2) => rdd2 ++ rdd1)
val init = sc.parallelize(Array[(String, String)]())
val inputHDFS2 = inputHDFS.foldRight(init)((rdd1,rdd2) => rdd2 ++ rdd1)

// returns org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.StackOverflowError
println(inputHDFS2.count)
+4
source share
3

, RDD, wild card RDD?

...

$ tree test/spark/so
test/spark/so
├── a
│   ├── text1.txt
│   └── text2.txt
└── b
    ├── text1.txt
    └── text2.txt

RDD .

scala> val rdd =  sc.wholeTextFiles("test/spark/so/*/*")
rdd: org.apache.spark.rdd.RDD[(String, String)] = test/spark/so/*/ WholeTextFileRDD[16] at wholeTextFiles at <console>:37

4, .

scala> rdd.count
res9: Long = 4

scala> rdd.collect
res10: Array[(String, String)] =
Array((test/spark/so/a/text1.txt,a1
a2
a3), (test/spark/so/a/text2.txt,a3
a4
a5), (test/spark/so/b/text1.txt,b1
b2
b3), (test/spark/so/b/text2.txt,b3
b4
b5))
+2

Seq, ( RDD ++):

val reduced: RDD[(String, String)] = input.reduce((left, right) => left ++ right)

, reduce :

  • ++ - , rdda ++ (rddb ++ rddc) (rdda ++ rddb) ++ rddc
  • , Seq ( fold , RDD[(String, String)] ).

Seq stackoverflow, , , .

+3

You must use the unionspark context provided

val rdds: Seq[RDD[Int]] = (1 to 100).map(i => sc.parallelize(Seq(i)))
val rdd_union: RDD[Int] = sc.union(rdds) 
+1
source

Source: https://habr.com/ru/post/1569554/


All Articles