How to add source file name to each line in Spark?

I am new to Spark and trying to insert a column in each input line with the name of the file from which it is derived.

I saw others ask a similar question, but all of their answers used wholeTextFile , but I'm trying to do this for large CSV files (read using the Spark-CSV library), JSON files and Parquet files (not just small text files).

I can use spark-shell to get a list of file names:

 val df = sqlContext.read.parquet("/blah/dir") val names = df.select(inputFileName()) names.show 

but this is a data frame. I am not sure how to add it as a column to each row (and if this result is ordered in the same way as the source data, although I assume it always is) and how to do it as a general solution for all input types.

+5
source share
2 answers

Another solution I just added was to add the file name as one of the columns in the DataFrame

 val df = sqlContext.read.parquet("/blah/dir") val dfWithCol = df.withColumn("filename",input_file_name()) 

Ref: fix load information and add file name as DataFrame column

+6
source

When you create an RDD from a text file, you probably want to map the data to the case class so that you can add an input source at this point:

 case class Person(inputPath: String, name: String, age: Int) val inputPath = "hdfs://localhost:9000/tmp/demo-input-data/persons.txt" val rdd = sc.textFile(inputPath).map { l => val tokens = l.split(",") Person(inputPath, tokens(0), tokens(1).trim().toInt) } rdd.collect().foreach(println) 

If you do not want to mix business data with metadata:

 case class InputSourceMetaData(path: String, size: Long) case class PersonWithMd(name: String, age: Int, metaData: InputSourceMetaData) // Fake the size, for demo purposes only val md = InputSourceMetaData(inputPath, size = -1L) val rdd = sc.textFile(inputPath).map { l => val tokens = l.split(",") PersonWithMd(tokens(0), tokens(1).trim().toInt, md) } rdd.collect().foreach(println) 

and if you are advertising RDD in a DataFrame:

 import sqlContext.implicits._ val df = rdd.toDF() df.registerTempTable("x") 

you can request it as

 sqlContext.sql("select name, metadata from x").show() sqlContext.sql("select name, metadata.path from x").show() sqlContext.sql("select name, metadata.path, metadata.size from x").show() 

Update

You can read files in HDFS using org.apache.hadoop.fs.FileSystem.listFiles() recursively.

Given a list of file names in the value of files (the standard Scala collection containing org.apache.hadoop.fs.LocatedFileStatus ), you can create one RDD for each file:

 val rdds = files.map { f => val md = InputSourceMetaData(f.getPath.toString, f.getLen) sc.textFile(md.path).map { l => val tokens = l.split(",") PersonWithMd(tokens(0), tokens(1).trim().toInt, md) } } 

Now you can reduce the RDD list into one: the function for reduce combines all the RDDs into one:

 val rdd = rdds.reduce(_ ++ _) rdd.collect().foreach(println) 

This works, but I canโ€™t check if this / works well with large files.

+2
source

Source: https://habr.com/ru/post/1234333/


All Articles