How to save partitioned parquet file in Spark 2.1?

I am trying to check how to write data in HDFS 2.7 using Spark 2.1. My data is a simple sequence of dummy values, and the output should be divided into sections by attributes: id and key.

 // Simple case class to cast the data
 case class SimpleTest(id:String, value1:Int, value2:Float, key:Int)

 // Actual data to be stored
 val testData = Seq(
    SimpleTest("test", 12, 13.5.toFloat, 1),
    SimpleTest("test", 12, 13.5.toFloat, 2),
    SimpleTest("test", 12, 13.5.toFloat, 3),
    SimpleTest("simple", 12, 13.5.toFloat, 1),
    SimpleTest("simple", 12, 13.5.toFloat, 2),
    SimpleTest("simple", 12, 13.5.toFloat, 3)
 )

 // Spark workflow to distribute, partition and store
 // sc and sql are the SparkContext and SparkSession, respectively
 val testDataP = sc.parallelize(testData, 6)
 val testDf = sql.createDataFrame(testDataP).toDF("id", "value1", "value2", "key")
 testDf.write.partitionBy("id", "key").parquet("/path/to/file")

I expect to get the following tree structure in HDFS:

- /path/to/file
   |- /id=test/key=1/part-01.parquet
   |- /id=test/key=2/part-02.parquet
   |- /id=test/key=3/part-03.parquet
   |- /id=simple/key=1/part-04.parquet
   |- /id=simple/key=2/part-05.parquet
   |- /id=simple/key=3/part-06.parquet

But when I run the previous code, I get the following output:

/path/to/file/id=/key=24/
 |-/part-01.parquet
 |-/part-02.parquet
 |-/part-03.parquet
 |-/part-04.parquet
 |-/part-05.parquet
 |-/part-06.parquet

I don’t know if something is wrong in the code, or is there something else that Spark does.

I do spark-submitas follows:

spark-submit --name APP --master local --driver-memory 30G - - 30G - 8 --num-executors 8 --conf spark.io.compression.codec = lzf --conf spark.akka.frameSize = 1024 --conf spark.driver.maxResultSize = 1g --conf spark.sql.orc.compression.codec = --conf spark.sql.parquet.filterPushdown = true --class myClass myFatJar.jar

+6
2

! Cloudera, mapred-site.xml( ). , , dataframe : testDf.write.partitionBy("id", "key").parquet("/path/to/file")

: testDf.write.partitionBy("id", "key").parquet("hdfs://<namenode>:<port>/path/to/file"). <namenode> <port> HDFS .

@jacek-laskowski .

:

https://community.cloudera.com/t5/Batch-SQL-Apache-Hive/MKDirs-failed-to-create-file/m-p/36363#M1090

HDFS Spark/ Scala

+2

, ... ... " ".

, case SimpleTest Spark 2.1, import spark.implicits._ , Dataset.

spark sql.

, testDataP testDf ( sql.createDataFrame).

import spark.implicits._
...
val testDf = testData.toDS
testDf.write.partitionBy("id", "key").parquet("/path/to/file")

( /tmp/testDf):

$ tree /tmp/testDf/
/tmp/testDf/
β”œβ”€β”€ _SUCCESS
β”œβ”€β”€ id=simple
β”‚   β”œβ”€β”€ key=1
β”‚   β”‚   └── part-00003-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
β”‚   β”œβ”€β”€ key=2
β”‚   β”‚   └── part-00004-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
β”‚   └── key=3
β”‚       └── part-00005-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
└── id=test
    β”œβ”€β”€ key=1
    β”‚   └── part-00000-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
    β”œβ”€β”€ key=2
    β”‚   └── part-00001-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
    └── key=3
        └── part-00002-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet

8 directories, 7 files
+9

Source: https://habr.com/ru/post/1676118/


All Articles