Convert a single column to multiple elements in a Spark Dataframe

I have a large data frame (over 1.2 GB) with this structure:

  + --------- + -------------- + ------------------------ -------------------------------------------------- ---------------------------- +
 |  country |  date_data |  text |
 + --------- + -------------- + ------------------------ -------------------------------------------------- ---------------------------- +
 |  "EEUU" |  "2016-10-03" |  "T_D: QQWE \ nT_NAME: name_1 \ nT_IN: ind_1 \ nT_C: c1ws12 \ nT_ADD: Sec_1_P \ n ........... \ nT_R: 45ee" |
 |  "EEUU" |  "2016-10-03" |  "T_D: QQAA \ nT_NAME: name_2 \ nT_IN: ind_2 \ nT_C: c1ws12 \ nT_ADD: Sec_1_P \ n ........... \ nT_R: 46ee" |
 |  .  |  .  |  .  |
 |  .  |  .  |  .  |
 |  "EEUU" |  "2016-10-03" |  "T_D: QQWE \ nT_NAME: name_300000 \ nT_IN: ind_65 \ nT_C: c1ws12 \ nT_ADD: Sec_1_P \ n ........... \ nT_R: 47aa" |
 + --------- + -------------- + ------------------------ -------------------------------------------------- ---------------------------- +

The number of lines is 300,000, and the text field is a line of 5,000 characters.

I would like to highlight the text field in the new fields:

  + --------- + ------------ + ------ + ------------- + ----- --- + -------- + --------- + -------- + ------ +
 |  country |  date_data |  t_d |  t_name |  t_in |  t_c |  t_add |  ...... |  t_r |
 + --------- + ------------ + ------ + ------------- + ----- --- + -------- + --------- + -------- + ------ +
 |  EEUU |  2016-10-03 |  QQWE |  name_1 |  ind_1 |  c1ws12 |  Sec_1_P |  ...... |  45ee |
 |  EEUU |  2016-10-03 |  QQAA |  name_2 |  ind_2 |  c1ws12 |  Sec_1_P |  ...... |  45ee |
 |  .  |  .  |  .  |  .  |  .  |  .  |  .  |  .  |  |
 |  .  |  .  |  .  |  .  |  .  |  .  |  .  |  .  |  |
 |  .  |  .  |  .  |  .  |  .  |  .  |  .  |  .  |  |
 |  EEUU |  2016-10-03 |  QQWE |  name_300000 |  ind_65 |  c1ws12 |  Sec_1_P |  ...... |  47aa |
 + --------- + ------------ + ------ + ------------- + ----- --- + -------- + --------- + -------- + ------ +

I am currently using regular expressions to solve this problem. First, I write regular expressions and create a function to extract individual fields from text (a total of 90 regular expressions):

val D_text = "((?<=T_D: ).*?(?=\\\\n))".r val NAME_text = "((?<=nT_NAME: ).*?(?=\\\\n))".r val IN_text = "((?<=T_IN: ).*?(?=\\\\n))".r val C_text = "((?<=T_C: ).*?(?=\\\\n))".r val ADD_text = "((?<=T_ADD: ).*?(?=\\\\n))".r . . . . val R_text = "((?<=T_R: ).*?(?=\\\\n))".r //UDF function: def getFirst(pattern2: scala.util.matching.Regex) = udf( (url: String) => pattern2.findFirstIn(url) match { case Some(texst_new) => texst_new case None => "NULL" case null => "NULL" } ) 

Then I create a new Dataframe (tbl_separate_fields) as a result of applying the regex function to extract each new field from the text.

 val tbl_separate_fields = hiveDF.select( hiveDF("country"), hiveDF("date_data"), getFirst(D_text)(hiveDF("texst")).alias("t_d"), getFirst(NAME_text)(hiveDF("texst")).alias("t_name"), getFirst(IN_text)(hiveDF("texst")).alias("t_in"), getFirst(C_text)(hiveDF("texst")).alias("t_c"), getFirst(ADD_text)(hiveDF("texst")).alias("t_add"), . . . . getFirst(R_text)(hiveDF("texst")).alias("t_r") ) 

Finally, I insert this dataframe into the Hive table:

 tbl_separate_fields.registerTempTable("tbl_separate_fields") hiveContext.sql("INSERT INTO TABLE TABLE_INSERT PARTITION (date_data) SELECT * FROM tbl_separate_fields") 

This solution lasts 1 hour for the entire data frame, so I want to optimize and reduce the execution time. Is there a solution?

We use Hadoop 2.7.1 and Apache-Spark 1.5.1 . Configuration for Spark:

 val conf = new SparkConf().set("spark.storage.memoryFraction", "0.1") val sc = new SparkContext(conf) val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 

Thanks in advance.

EDIT DATA:

  + --------- + -------------- + ------------------------ -------------------------------------------------- ---------------------------- +
 |  country |  date_data |  text |
 + --------- + -------------- + ------------------------ -------------------------------------------------- ---------------------------- +
 |  "EEUU" |  "2016-10-03" |  "T_D: QQWE \ nT_NAME: name_1 \ nT_IN: ind_1 \ nT_C: c1ws12 \ nT_ADD: Sec_1_P \ n ........... \ nT_R: 45ee" |
 |  "EEUU" |  "2016-10-03" |  "T_NAME: name_2 \ nT_D: QQAA \ nT_IN: ind_2 \ nT_C: c1ws12 ........... \ nT_R: 46ee" |
 |  .  |  .  |  .  |
 |  .  |  .  |  .  |
 |  "EEUU" |  "2016-10-03" |  "T_NAME: name_300000 \ nT_ADD: Sec_1_P \ nT_IN: ind_65 \ nT_C: c1ws12 \ n ........... \ nT_R: 47aa" |
 + --------- + -------------- + ------------------------ -------------------------------------------------- ---------------------------- +
+5
source share
1 answer

Using regular expressions in this case is slow and also fragile.

If you know that all entries have the same structure, that is, that all β€œtext” values ​​have the same number and order of β€œparts”, the following code will work (for any number of columns), mainly using split in org.apache.spark.sql.functions :

 import org.apache.spark.sql.functions._ // first - split "text" column values into Arrays val textAsArray: DataFrame = inputDF .withColumn("as_array", split(col("text"), "\n")) .drop("text") .cache() // get a sample (first row) to get column names, can be skipped if you want to hard-code them: val sampleText = textAsArray.first().getAs[mutable.WrappedArray[String]]("as_array").toArray val columnNames: Array[(String, Int)] = sampleText.map(_.split(": ")(0)).zipWithIndex // add Column per columnName with the right value and drop the no-longer-needed as_array column val withValueColumns: DataFrame = columnNames.foldLeft(textAsArray) { case (df, (colName, index)) => df.withColumn(colName, split(col("as_array").getItem(index), ": ").getItem(1)) }.drop("as_array") withValueColumns.show() // for the sample data I created, // with just 4 "parts" in "text" column, this prints: // +-------+----------+----+------+-----+------+ // |country| date_data| T_D|T_NAME| T_IN| T_C| // +-------+----------+----+------+-----+------+ // | EEUU|2016-10-03|QQWE|name_1|ind_1|c1ws12| // | EEUU|2016-10-03|QQAA|name_2|ind_2|c1ws12| // +-------+----------+----+------+-----+------+ 

Alternatively , if the above assumption is incorrect, you can use UDF, which converts the text column to Map , and then performs a similar reduceLeft operation on a hard-coded list of desired columns:

 import sqlContext.implicits._ // sample data: not the same order, not all records have all columns: val inputDF: DataFrame = sc.parallelize(Seq( ("EEUU", "2016-10-03", "T_D: QQWE\nT_NAME: name_1\nT_IN: ind_1\nT_C: c1ws12"), ("EEUU", "2016-10-03", "T_D: QQAA\nT_IN: ind_2\nT_NAME: name_2") )).toDF("country", "date_data", "text") // hard-coded list of expected column names: val columnNames: Seq[String] = Seq("T_D", "T_NAME", "T_IN", "T_C") // UDF to convert text into key-value map val asMap = udf[Map[String, String], String] { s => s.split("\n").map(_.split(": ")).map { case Array(k, v) => k -> v }.toMap } val textAsMap = inputDF.withColumn("textAsMap", asMap(col("text"))).drop("text") // for each column name - lookup the value in the map val withValueColumns: DataFrame = columnNames.foldLeft(textAsMap) { case (df, colName) => df.withColumn(colName, col("textAsMap").getItem(colName)) }.drop("textAsMap") withValueColumns.show() // prints: // +-------+----------+----+------+-----+------+ // |country| date_data| T_D|T_NAME| T_IN| T_C| // +-------+----------+----+------+-----+------+ // | EEUU|2016-10-03|QQWE|name_1|ind_1|c1ws12| // | EEUU|2016-10-03|QQAA|name_2|ind_2| null| // +-------+----------+----+------+-----+------+ 
+1
source

Source: https://habr.com/ru/post/1258032/


All Articles