Download CSV file as DataFrame?

I would like to read CSV in spark mode and convert it to DataFrame and save it to HDFS using df.registerTempTable("table_name")

I tried:

 scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv") 

The error I received is:

 java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10] at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276) at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

What is the right command to load a CSV file as a DataFrame in Apache Spark?

+106
scala hadoop hdfs apache-spark apache-spark-sql spark-dataframe
Apr 17 '15 at 16:10
source share
13 answers

spark-csv is part of Spark's core functionality and does not require a separate library. So you could just do, for example,

 df = spark.read.format("csv").option("header", "true").load("csvfile.csv") 

In scala (this works for any format with a separator, "," for csv, "\ t" for tsv, etc.) val df = sqlContext.read.format("com.databricks.spark.csv").option("delimiter", ",").load("csvfile.csv")

+127
Apr 17 '15 at 17:33
source share

Parse CSV and load as DataFrame / DataSet with Spark 2.x

Initialize the default SparkSession object SparkSession , it will be available in shells as spark

 val spark = org.apache.spark.sql.SparkSession.builder .master("local") .appName("Spark CSV Reader") .getOrCreate; 

Use any of the following CSV loading methods as a DataFrame/DataSet

1. Do it programmatically

  val df = spark.read .format("csv") .option("header", "true") //first line in file has headers .option("mode", "DROPMALFORMED") .load("hdfs:///csv/file/dir/file.csv") 

2. You can also do it in SQL way

  val df = spark.sql("SELECT * FROM csv.'hdfs:///csv/file/dir/file.csv'") 

Dependencies :

  "org.apache.spark" % "spark-core_2.11" % 2.0.0, "org.apache.spark" % "spark-sql_2.11" % 2.0.0, 






Spark version <2.0

 val df = sqlContext.read .format("com.databricks.spark.csv") .option("header", "true") .option("mode", "DROPMALFORMED") .load("csv/file/path"); 

dependencies:

 "org.apache.spark" % "spark-sql_2.10" % 1.6.0, "com.databricks" % "spark-csv_2.10" % 1.6.0, "com.univocity" % "univocity-parsers" % LATEST, 
+144
Sep 16 '16 at 14:01
source share

This is for whose Hadoop 2.6 and Spark 1.6 and without the "databricks" package.

 import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}; import org.apache.spark.sql.Row; val csv = sc.textFile("/path/to/file.csv") val rows = csv.map(line => line.split(",").map(_.trim)) val header = rows.first val data = rows.filter(_(0) != header(0)) val rdd = data.map(row => Row(row(0),row(1).toInt)) val schema = new StructType() .add(StructField("id", StringType, true)) .add(StructField("val", IntegerType, true)) val df = sqlContext.createDataFrame(rdd, schema) 
+13
Mar 20 '17 at 8:35
source share

With Spark 2.0, follow these steps: CSV

 val conf = new SparkConf().setMaster("local[2]").setAppName("my app") val sc = new SparkContext(conf) val sparkSession = SparkSession.builder .config(conf = conf) .appName("spark session example") .getOrCreate() val path = "/Users/xxx/Downloads/usermsg.csv" val base_df = sparkSession.read.option("header","true"). csv(path) 
+11
Sep 27 '16 at 3:17
source share

In Java 1.8, this piece of code works great for reading CSV files

POM.xml

 <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>2.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-csv_2.10</artifactId> <version>1.4.0</version> </dependency> 

Java

 SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local"); // create Spark Context SparkContext context = new SparkContext(conf); // create spark Session SparkSession sparkSession = new SparkSession(context); Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv"); //("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv"); System.out.println("========== Print Schema ============"); df.printSchema(); System.out.println("========== Print Data =============="); df.show(); System.out.println("========== Print title =============="); df.select("title").show(); 
+8
Oct 18 '16 at 10:05
source share

Penny Spark 2 example - a way to do this in spark2. There is another trick: create a header for this by performing an initial data scan, setting the inferSchema option to true

Here, for example, assuming spark is the spark session that you configured is the upload operation to the CSV index file of all Landsat images that are located on the Amazon host on S3.

  /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ val csvdata = spark.read.options(Map( "header" -> "true", "ignoreLeadingWhiteSpace" -> "true", "ignoreTrailingWhiteSpace" -> "true", "timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ", "inferSchema" -> "true", "mode" -> "FAILFAST")) .csv("s3a://landsat-pds/scene_list.gz") 

The bad news: this causes a scan through the file; for something big, like this 20+ MB archived CSV file, which can take 30 seconds for a long time. Keep this in mind: you better manually code the circuit as soon as you receive it.

(the Apache Software License 2.0 code snippet is licensed to avoid any ambiguity, which I did as a demo / integration test for S3 integration)

+4
Oct 18 '16 at 19:34
source share

There are many problems when parsing a CSV file, it continues to add up if the file size is larger, if the column values ​​contain characters other than English / escape / separator / other, which can lead to parsing errors.

Then the magic is used in options. Those that worked for me and hope should cover most of the extreme cases in the code below:

 ### Create a Spark Session spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate() ### Note the options that are used. You may have to tweak these in case of error html_df = spark.read.csv(html_csv_file_path, header=True, multiLine=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True, encoding="UTF-8", sep=',', quote='"', escape='"', maxColumns=2, inferSchema=True) 

Hope this helps. See: Using PySpark 2 to Read CSV with HTML Source

Note: the above code applies to the Spark 2 API, where the CSV file reader API comes bundled with the built-in Spark packages that you can install.

Note. PySpark is a Python shell for Spark and has the same API as Scala / Java.

+3
Aug 21 '18 at 10:57
source share

In case you create a jar with Scala 2.11 and Apache 2.0 or higher.

There is no need to create a sqlContext or sparkContext . It’s just that the SparkSession object meets the requirements for all needs.

Below is my code that works fine:

 import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession} import org.apache.log4j.{Level, LogManager, Logger} object driver { def main(args: Array[String]) { val log = LogManager.getRootLogger log.info("**********JAR EXECUTION STARTED**********") val spark = SparkSession.builder().master("local").appName("ValidationFrameWork").getOrCreate() val df = spark.read.format("csv") .option("header", "true") .option("delimiter","|") .option("inferSchema","true") .load("d:/small_projects/spark/test.pos") df.show() } } 

If you are working in a cluster, simply change .master("local") to .master("yarn") sparkBuilder .master("yarn") when defining a sparkBuilder object

The Spark document covers this: https://spark.apache.org/docs/2.2.0/sql-programming-guide.html

+1
Oct 30 '18 at 1:59
source share

Try this if using spark 2.0.0+

 For non-hdfs file: df = spark.read.csv("file:///csvfile.csv") For hdfs file: df = spark.read.csv("hdfs:///csvfile.csv") For hdfs file (with different delimiter than comma: df = spark.read.option("delimiter","|")csv("hdfs:///csvfile.csv") 

Note: - this works for any delimited file. Just use the parameter ("delimiter",) to change the value.

Hope this is helpful.

+1
Jun 28 '19 at 18:25
source share

The default file format is Parquet with spark.read .. and reading the csv file, why are you getting an exception. Specify the csv format using the api you are trying to use.

0
Aug 01 '18 at 6:30
source share

Loads a CSV file and returns the result as a DataFrame.

 df=sparksession.read.option("header", true).csv("file_name.csv") 

The data frame considered the file as a csv format.

0
Mar 03 '19 at 10:29
source share

I can do the same as below:

 val conf = new SparkConf().setAppName("Test Spark").setMaster("local[1]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val txtDf = sqlContext.read.format("com.databricks.spark.csv") .option("header", "true").load("D:\\spark-training\\employee.txt") txtDf.registerTempTable("employee") val employees = sqlContext.sql("select * from employee") employees.printSchema() employees.show() 
-one
Jul 14 '17 at 7:07
source share

NOTE. For versions of Spark earlier than 1.7

  val dataframe = sqlContext.read.format("com.databricks.spark.csv"). option("delimiter", "\t"). option("header", "true"). option("inferSchema", "true"). load("file_name") 

for csv use the separator as ',' and change the parameters as necessary, for example, header and inferSchema for python just take val and it works But you need to pass this package either to your spark shell, or fix it - submit

 spark-shell --packages com.databricks:spark-csv_2.10:1.4.0 or spark-submit --packages com.databricks:spark-csv_2.10:1.4.0 
-one
Oct 20 '17 at 21:40
source share



All Articles