Creating a hive table using parquet file metadata

I wrote a DataFrame as a parquet file. And I would like to read the file using Hive, using metadata from the parquet.

Writing parquet output

_common_metadata part-r-00000-0def6ca1-0f54-4c53-b402-662944aa0be9.gz.parquet part-r-00002-0def6ca1-0f54-4c53-b402-662944aa0be9.gz.parquet _SUCCESS _metadata part-r-00001-0def6ca1-0f54-4c53-b402-662944aa0be9.gz.parquet part-r-00003-0def6ca1-0f54-4c53-b402-662944aa0be9.gz.parquet 

Hive table

 CREATE TABLE testhive ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION '/home/gz_files/result'; FAILED: SemanticException [Error 10043]: Either list of columns or a custom serializer should be specified 

How can I output metadata from a parquet file?

If I open _common_metadata , I will be below the contents,

 PAR1LHroot %TSN% %TS% %Etype% )org.apache.spark.sql.parquet.row.metadataโ–’{"type":"struct","fields":[{"name":"TSN","type":"string","nullable":true,"metadata":{}},{"name":"TS","type":"string","nullable":true,"metadata":{}},{"name":"Etype","type":"string","nullable":true,"metadata":{}}]} 

Or how to parse a metadata file?

+6
source share
6 answers

Here is the solution I came up with to get metadata from parquet files to create a Hive table.

Start the spark shell first (or compile it all in a Jar and run it with spark-submit, but the SOO shell is much simpler).

 import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.DataFrame val df=sqlContext.parquetFile("/path/to/_common_metadata") def creatingTableDDL(tableName:String, df:DataFrame): String={ val cols = df.dtypes var ddl1 = "CREATE EXTERNAL TABLE "+tableName + " (" //looks at the datatypes and columns names and puts them into a string val colCreate = (for (c <-cols) yield(c._1+" "+c._2.replace("Type",""))).mkString(", ") ddl1 += colCreate + ") STORED AS PARQUET LOCATION '/wherever/you/store/the/data/'" ddl1 } val test_tableDDL=creatingTableDDL("test_table",df,"test_db") 

It will provide you with the data types that Hive will use for each column as they are stored in Parquet. EG: CREATE EXTERNAL TABLE test_table (COL1 Decimal(38,10), COL2 String, COL3 Timestamp) STORED AS PARQUET LOCATION '/path/to/parquet/files'

+10
source

I just wanted to expand on James Tobin's answer. There is a StructField class that provides Hive data types without performing string replacements.

 // Tested on Spark 1.6.0. import org.apache.spark.sql.DataFrame def dataFrameToDDL(dataFrame: DataFrame, tableName: String): String = { val columns = dataFrame.schema.map { field => " " + field.name + " " + field.dataType.simpleString.toUpperCase } s"CREATE TABLE $tableName (\n${columns.mkString(",\n")}\n)" } 

This solves the IntegerType problem.

 scala> val dataFrame = sc.parallelize(Seq((1, "a"), (2, "b"))).toDF("x", "y") dataFrame: org.apache.spark.sql.DataFrame = [x: int, y: string] scala> print(dataFrameToDDL(dataFrame, "t")) CREATE TABLE t ( x INT, y STRING ) 

This should work with any DataFrame, not just Parquet. (for example, I use this with a JDBC DataFrame.)

As an added bonus, if your target DDL supports nullable columns, you can extend this function by checking StructField.nullable .

+7
source

A slight improvement compared to Victor (adding quotes on field.name) and changing the binding of the table to the local parquet file (checked for spark 1.6.1)

 def dataFrameToDDL(dataFrame: DataFrame, tableName: String, absFilePath: String): String = { val columns = dataFrame.schema.map { field => " `" + field.name + "` " + field.dataType.simpleString.toUpperCase } s"CREATE EXTERNAL TABLE $tableName (\n${columns.mkString(",\n")}\n) STORED AS PARQUET LOCATION '"+absFilePath+"'" } 

Also note that:

  • HiveContext is needed because SQLContext does not support creating an external table.
  • The path to the parquet folder must be an absolute path.
+1
source

I would like to expand on James's answer,

The following code will work for all data types, including ARRAY, MAP, and STRUCT.

Tested in SPARK 2.2

 val df=sqlContext.parquetFile("parquetFilePath") val schema = df.schema var columns = schema.fields var ddl1 = "CREATE EXTERNAL TABLE " tableName + " (" val cols=(for(column <- columns) yield column.name+" "+column.dataType.sql).mkString(",") ddl1=ddl1+cols+" ) STORED AS PARQUET LOCATION '/tmp/hive_test1/'" spark.sql(ddl1) 
+1
source

I had the same question. This can be difficult to implement from a practical point of view, though, since Parquet supports the evolution of the circuit:

http://www.cloudera.com/content/www/en-us/documentation/archive/impala/2-x/2-0-x/topics/impala_parquet.html#parquet_schema_evolution_unique_1

For example, you can add a new column to your table, and you do not need to touch the data that is already in the table. Only new data files will have new metadata (compatible with the previous version).

Schema merging is disabled by default because Spark 1.5.0, because it is a "relatively expensive operation", http://spark.apache.org/docs/latest/sql-programming-guide.html#schema-merging Therefore, the output of the last scheme may not be as simple as it seems. Although quick and dirty approaches are quite possible, for example. by analyzing the output from

 $ parquet-tools schema /home/gz_files/result/000000_0 
0
source

In fact, Impala supports

 CREATE TABLE LIKE PARQUET 

(column section in general):

http://www.cloudera.com/content/www/en-us/documentation/archive/impala/2-x/2-1-x/topics/impala_create_table.html

The tags of your question have a beehive and a spark, and I donโ€™t see that it is implemented in Hive, but in the case of using CDH this may be what you were looking for.

0
source

Source: https://habr.com/ru/post/1238336/


All Articles