How to read only n lines of a large CSV file on HDFS using spark-csv?

I have a large distributed file on HDFS, and every time I use sqlContext with spark-csv package, it first downloads the whole file, which takes quite a lot of time.

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path")

now, since I just want to do some quick checks at times, all I need is some / any n lines of the whole file.

df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").take(n)
df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").head(n)

but they all start after the file is downloaded. Can't I just limit the number of lines when reading the file itself? I mean the equivalent of n_rows pandas in spark-csv, for example:

pd_df = pandas.read_csv("file_path", nrows=20)

Or it may happen that the spark does not actually download the file, the first step, but in this case, why does my step of loading the file take too long?

I want to

df.count()

n, , ?

+11
4

limit(n).

sqlContext.format('com.databricks.spark.csv') \
          .options(header='true', inferschema='true').load("file_path").limit(20)

20 .

+12

, spark-csv , , , . spark-csv , inferSchema ( , ).

val numberOfLines = ...
spark.
  read.
  text("myfile.csv").
  limit(numberOfLines).
  write.
  text(s"myfile-$numberOfLines.csv")
val justFewLines = spark.
  read.
  option("inferSchema", true). // <-- you are in exploration mode, aren't you?
  csv(s"myfile-$numberOfLines.csv")
+8

limit(n), .

f_schema = StructType([
StructField("col1",LongType(),True),
StructField("col2",IntegerType(),True),
StructField("col3",DoubleType(),True)
...
])

df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true').schema(f_schema).load(data_path).limit(10)

.. inferschema='true', , , , .

, .:)

+2

Starting with PySpark 2.3, you can simply load the data as text, limit and apply the csv reader to the result:

(spark
  .read
  .options(inferSchema="true", header="true")
  .csv(
      spark.read.text("/path/to/file")
          .limit(20)                   # Apply limit
          .rdd.flatMap(lambda x: x)))  # Convert to RDD[str]

Scala counterpart is available since Spark 2.2:

spark
  .read
  .options(Map("inferSchema" -> "true", "header" -> "true"))
  .csv(spark.read.text("/path/to/file").limit(20).as[String])

In Spark 3.0.0 or later, you can also use the limit function and use it from_csv, but this requires a circuit, so it probably will not meet your requirements.

0
source

Source: https://habr.com/ru/post/1678221/


All Articles