Spark - get file name in RDD

I am trying to process 4 directories of text files that are constantly growing every day. What I need to do, if someone is trying to find the invoice number, I must provide them with a list of the files that he has.

I was able to match and reduce the values ​​in text files by loading them as RDD. But how can I get the file name and other file attributes?

+10
source share
7 answers

Starting with Spark 1.6, you can combine the data source text and input_file_name as follows:

Scala

 import org.apache.spark.sql.functions.input_file_name val inputPath: String = ??? spark.read.text(inputPath) .select(input_file_name, $"value") .as[(String, String)] // Optionally convert to Dataset .rdd // or RDD 

Python

(Versions before 2.x are bugs and cannot save names when converting to RDD):

 from pyspark.sql.functions import input_file_name (spark.read.text(input_path) .select(input_file_name(), "value")) .rdd) 

This can be used with other input formats.

+26
source

If your text files are small enough, you can use SparkContext.wholeTextFiles , which returns RDD (filename,content) .

+4
source

If your text files are too large for SparkContext.wholeTextFiles , you should use a (simple) custom InputFormat and then call SparkContext.hadoopRDD

In InputFormat, you need to return a tuple (file name, string), not a string, then you can filter using a predicate that looks at the contents of the string, then uniquely and collects the file names.

From Spark, the code would look something like this:

 val ft = classOf[FileNamerInputFormat] val kt = classOf[String] val vt = classOf[String] val hadoopConfig = new Configuration(sc.hadoopConfiguration) sc.newAPIHadoopFile(path, ft, kt, vt, hadoopConfig) .filter { case (f, l) => isInteresting(l) } .map { case (f, _) => f } .distinct() .collect() 
+3
source

You can try this if you are in pyspark:

  test = sc.wholeTextFiles("pathtofile") 

you will get the resulting RDD with the first element = file path and the second element = content

+3
source

You can use WholeTextFile() to achieve this. However, if the input files are large, then it would be WholeTextFile() to work with WholeTextFile() , since it put all the contents of the file in one record.

The best way to get the file names in such a scenario is to use mapPartitionsWithInputSplit() . You can find a working example using this script from my blog .

+3
source

It seems unnecessary to use Spark directly ... If this data is "collected" by the driver, why not use the HDFS API? Often, Hadoop comes with Spark. Here is an example:

 import org.apache.hadoop.fs._ import org.apache.hadoop.conf._ val fileSpec = "/data/Invoices/20171123/21" val conf = new Configuration() val fs = org.apache.hadoop.fs.FileSystem.get(new URI("hdfs://nameNodeEneteredHere"),conf) val path = new Path(fileSpec) // if(fs.exists(path) && fs.isDirectory(path) == true) ... val fileList = fs.listStatus(path) 

Then, with println(fileList(0)) information (formatted) similar to this first element (as an example) can be seen as org.apache.hadoop.fs.FileStatus :

 FileStatus { path=hdfs://nameNodeEneteredHere/Invoices-0001.avro; isDirectory=false; length=29665563; replication=3; blocksize=134217728; modification_time=1511810355666; access_time=1511838291440; owner=codeaperature; group=supergroup; permission=rw-r--r--; isSymlink=false } 

Where fileList(0).getPath will give hdfs://nameNodeEneteredHere/Invoices-0001.avro .

I assume that this means that reading files will be primarily using namenode HDFS, and not inside each artist. TL; DR; I am sure that Spark will most likely poll the namenode to get the RDD. If a basic Spark call queries the namenode for RDD control, perhaps the above is an effective solution. Nevertheless, informative comments suggesting any direction will be welcome.

+1
source

If you use the Dataframe API, you can get the file names from HDFS using the input_file_name function from org.apache.spark.sql.functions . The snippets below can help you understand.

 val df = spark.read.csv("/files/") val df2 = df.withColumn("file_name", split(input_file_name(), "/").getItem(7).cast(StringType)) val df3 = df.withColumn("file_name", input_file_name()) 

df2 now includes a new field named "filename" that contains the HDFS file name extracted using the split function. If you need the full HDFS path, you can use the input_file_name() function only as shown in df3 .

0
source

Source: https://habr.com/ru/post/985303/


All Articles