How to convert the csv file to rdd

I'm new to the spark. I want to perform some operations on specific data in a CSV record.

I am trying to read a CSV file and convert it to RDD. My further operations are based on the header provided in the CSV file.

(From comments) This is my code:

final JavaRDD<String> File = sc.textFile(Filename).cache(); final JavaRDD<String> lines = File.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String s) { return Arrays.asList(EOL.split(s)); } }); final String heading=lines.first().toString(); 

I can get the header values ​​as follows. I want to map this to every entry in the CSV file.

 final String[] header=heading.split(" "); 

I can get the header values ​​as follows. I want to map this to every entry in the CSV file.

In java Im using CSVReader record.getColumnValue(Column header) to get a specific value. I need to do something like this.

+45
scala apache-spark
Jun 19 '14 at 5:35
source share
12 answers

A simplified approach would be to keep the title.

Say you have a file. CSV:

 user, topic, hits om, scala, 120 daniel, spark, 80 3754978, spark, 1 

We can define a header class that uses a collapsible version of the first line:

 class SimpleCSVHeader(header:Array[String]) extends Serializable { val index = header.zipWithIndex.toMap def apply(array:Array[String], key:String):String = array(index(key)) } 

So that we can use the data in the future, we can use this header:

 val csv = sc.textFile("file.csv") // original file val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line val rows = data.filter(line => header(line,"user") != "user") // filter the header out val users = rows.map(row => header(row,"user") val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt) ... 

Note that header not much more than a simple mapping of mnemonics to an array index. Almost all of this can be done in the ordinal position of an element in an array, for example user = row(0)

PS: Welcome to Scala :-)

+50
Jun 19 '14 at 13:12
source share
— -

You can use the spark-csv library: https://github.com/databricks/spark-csv

This is directly from the documentation:

 import org.apache.spark.sql.SQLContext SQLContext sqlContext = new SQLContext(sc); HashMap<String, String> options = new HashMap<String, String>(); options.put("header", "true"); options.put("path", "cars.csv"); DataFrame df = sqlContext.load("com.databricks.spark.csv", options); 
+15
May 01 '15 at 1:56
source share

Firstly, I have to say that it is much easier if you put the headers in separate files - this is a convention in big data.

In any case, Daniel's answer is pretty good, but he has inefficiency and error, so I'm going to post my own. The inefficiency is that you do not need to check each entry to see if it is a heading, you just need to check the first entry for each section. The error is that with .split(",") you can get an exception or get the wrong column when records are an empty row and occur at the beginning or end of a record - to fix this, you need to use .split(",", -1) . So here is the complete code:

 val header = scala.io.Source.fromInputStream( hadoop.fs.FileSystem.get(new java.net.URI(filename), sc.hadoopConfiguration) .open(new hadoop.fs.Path(path))) .getLines.head val columnIndex = header.split(",").indexOf(columnName) sc.textFile(path).mapPartitions(iterator => { val head = iterator.next() if (head == header) iterator else Iterator(head) ++ iterator }) .map(_.split(",", -1)(columnIndex)) 

Endpoints, consider Parquet if you only want to catch certain columns. Or at least think about implementing a lazily evaluated separation function if you have wide ranks.

+8
Jun 20 '14 at 15:38
source share

We can use the new DataFrameRDD to read and write CSV data. There are several advantages of DataFrameRDD over NormalRDD:

  • The DataFrameRDD bit is faster than NormalRDD because we define a circuit and that helps to significantly optimize the execution time and provide us with a significant increase in performance.
  • Even if the column is shifted in CSV, it will automatically accept the correct column, since we will not hard-code the column number that was present when reading data as a textFile, then split it, and then use the number of columns to get the data.
  • In a few lines of code, you can directly read the CSV file.

You need a library: add it to build.sbt

 libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.2.0" 

Sparks Scala code for it:

 val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val csvInPath = "/path/to/csv/abc.csv" val df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load(csvInPath) //format is for specifying the type of file you are reading //header = true indicates that the first line is header in it 

To convert to a regular RDD, taking some columns from it and

 val rddData = df.map(x=>Row(x.getAs("colA"))) //Do other RDD operation on it 

Saving the RDD format in CSV format:

 val aDf = sqlContext.createDataFrame(rddData,StructType(Array(StructField("colANew",StringType,true)))) aDF.write.format("com.databricks.spark.csv").option("header","true").save("/csvOutPath/aCSVOp") 

Since the header is true, we get the name of the header in all the output files.

+4
Sep 27 '15 at 5:05
source share

I would recommend reading the header directly from the driver, and not through Spark. Two reasons for this: 1) This is one line. There is no advantage to a distributed approach. 2) We need this line in the driver, and not in the work nodes.

It looks something like this:

 // Ridiculous amount of code to read one line. val uri = new java.net.URI(filename) val conf = sc.hadoopConfiguration val fs = hadoop.fs.FileSystem.get(uri, conf) val path = new hadoop.fs.Path(filename) val stream = fs.open(path) val source = scala.io.Source.fromInputStream(stream) val header = source.getLines.head 

Now that you are doing RDD, you can drop the header.

 val csvRDD = sc.textFile(filename).filter(_ != header) 

Then we can make RDD from one column, for example:

 val idx = header.split(",").indexOf(columnName) val columnRDD = csvRDD.map(_.split(",")(idx)) 
+3
Jun 19 '14 at 10:31 on
source share

Here is another example of using Spark / Scala to convert CSV to RDD . See post for a more detailed description.

 def main(args: Array[String]): Unit = { val csv = sc.textFile("/path/to/your/file.csv") // split / clean data val headerAndRows = csv.map(line => line.split(",").map(_.trim)) // get header val header = headerAndRows.first // filter out header (eh. just check if the first val matches the first header name) val data = headerAndRows.filter(_(0) != header(0)) // splits to map (header/value pairs) val maps = data.map(splits => header.zip(splits).toMap) // filter out the user "me" val result = maps.filter(map => map("user") != "me") // print result result.foreach(println) } 
+3
Feb 10 '15 at 22:25
source share

Another alternative is to use the mapPartitionsWithIndex method, as you will get the index number of the section and a list of all the lines in that section. Section 0 and line 0 will be the title

 val rows = sc.textFile(path) .mapPartitionsWithIndex({ (index: Int, rows: Iterator[String]) => val results = new ArrayBuffer[(String, Int)] var first = true while (rows.hasNext) { // check for first line if (index == 0 && first) { first = false rows.next // skip the first row } else { results += rows.next } } results.toIterator }, true) rows.flatMap { row => row.split(",") } 
+2
Feb 24 '15 at 18:16
source share

How about this?

 val Delimeter = "," val textFile = sc.textFile("data.csv").map(line => line.split(Delimeter)) 
+1
Jun 19 '14 at 7:34
source share

I suggest you try

https://spark.apache.org/docs/latest/sql-programming-guide.html#rdds

 JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map( new Function<String, Person>() { public Person call(String line) throws Exception { String[] parts = line.split(","); Person person = new Person(); person.setName(parts[0]); person.setAge(Integer.parseInt(parts[1].trim())); return person; } }); 

You should have a class in this example with a file header specification and associate your data with the schema and apply the criteria, as in mysql .., to get the desired result

0
Aug 18 '14 at 12:15
source share

I think you can try loading this csv into RDD and then create a data frame from this RDD, here is a document for creating a dataframe from rdd: http://spark.apache.org/docs/latest/sql-programming-guide.html# interoperating-with-rdds

0
Mar 24 '16 at 8:08
source share

For scala spark I usually use when I cannot use csv spark packages ...

 val sqlContext = new org.apache.spark.sql.SQLContext(sc) val rawdata = sc.textFile("hdfs://example.host:8020/user/example/example.csv") val header = rawdata.first() val tbldata = rawdata.filter(_(0) != header(0)) 
0
May 26 '16 at 17:34
source share

Starting with Spark 2.0, CSV can be read directly in the DataFrame .

If the data file does not have a header line, this will be:

 val df = spark.read.csv("file://path/to/data.csv") 

This will load the data, but give each column common names such as _c0 , _c1 , etc.

If there are headers, then adding .option("header", "true") will use the first line to define the columns in the DataFrame :

 val df = spark.read .option("header", "true") .csv("file://path/to/data.csv") 



For a specific example, suppose you have a file with the contents:

 user,topic,hits om,scala,120 daniel,spark,80 3754978,spark,1 

Then you get the total number of hits, grouped by topic:

 import org.apache.spark.sql.functions._ import spark.implicits._ val rawData = spark.read .option("header", "true") .csv("file://path/to/data.csv") // specifies the query, but does not execute it val grouped = rawData.groupBy($"topic").agg(sum($"hits)) // runs the query, pulling the data to the master node // can fail if the amount of data is too much to fit // into the master node memory! val collected = grouped.collect // runs the query, writing the result back out // in this case, changing format to Parquet since that can // be nicer to work with in Spark grouped.write.parquet("hdfs://some/output/directory/") // runs the query, writing the result back out // in this case, in CSV format with a header and // coalesced to a single file. This is easier for human // consumption but usually much slower. grouped.coalesce(1) .write .option("header", "true") .csv("hdfs://some/output/directory/") 
0
Jun 15 '17 at 0:40
source share



All Articles