HDFS Recording in Spark / Scala

I am writing a spark / scala program to read in ZIP files, unzip them and write the contents to a set of new files. I can make this work for writing to the local file system, but wondered if there is a way to write output files to a distributed file system such as HDFS. The code is displayed below `

import java.util.zip.ZipInputStream
import org.apache.spark.input.PortableDataStream
import java.io._

var i =1
sc.binaryFiles("file:///d/tmp/zips/").flatMap( (file: (String,     PortableDataStream)) => 
   {   


   val zipStream = new ZipInputStream(file._2.open)            
   val entry = zipStream.getNextEntry                            
   val iter = scala.io.Source.fromInputStream(zipStream).getLines          

   val fname = f"/d/tmp/myfile$i.txt" 


   i = i + 1

   val xx = iter.mkString
   val writer = new PrintWriter(new File(fname))
   writer.write(xx)
   writer.close()

   iter                                                       
   }).collect()

`

+1
source share
4 answers

You can easily write data to HDFS using the hadoop public library (if you use sbt as a dependency configuration tool, add this library to your dependency). With this, you can create a FileSystem object :

 private val fs = {
    val conf = new Configuration()
    FileSystem.get(conf)
  }

(core-site.xml ..)

, , String to path ( ), HDFS, :

@throws[IOException]
  def writeAsString(hdfsPath: String, content: String) {
    val path: Path = new Path(hdfsPath)
    if (fs.exists(path)) {
      fs.delete(path, true)
    }
    val dataOutputStream: FSDataOutputStream = fs.create(path)
    val bw: BufferedWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, "UTF-8"))
    bw.write(content)
    bw.close
  }
+4

saveAsTextFile : http://spark.apache.org/docs/latest/programming-guide.html

HDFS:

iter.saveAsTextFile("hdfs://...")
0

saveAsTextFile.

( ) , HDFS Hadoop . Spark toString , .

It will save each section as a different file. The number of sections you will end up in will be the same as the number of your input files if you do not redistribute or merge.

0
source
sc.binaryFiles("/user/example/zip_dir", 10)                             //make an RDD from *.zip files in HDFS
            .flatMap((file: (String, PortableDataStream)) => {                  //flatmap to unzip each file
                val zipStream = new ZipInputStream(file._2.open)                //open a java.util.zip.ZipInputStream
                val entry = zipStream.getNextEntry                              //get the first entry in the stream
                val iter = Source.fromInputStream(zipStream).getLines           //place entry lines into an iterator
                iter.next                                                       //pop off the iterator first line
                iter                                                            //return the iterator
            })
            .saveAsTextFile("/user/example/quoteTable_csv/result.csv")
0
source

Source: https://habr.com/ru/post/1676120/


All Articles