Split dataset based on column values ​​in the spark

I am trying to split a data set into different data sets based on the contents of a producer column. It is very slow. Please suggest a way to improve the code so that it can execute faster and reduce the use of Java code.

List<Row> lsts= countsByAge.collectAsList(); for(Row lst:lsts){ String man=lst.toString(); man = man.replaceAll("[\\p{Ps}\\p{Pe}]", ""); Dataset<Row> DF = src.filter("Manufacturer='"+man+"'"); DF.show(); } 

The code, input, and output datasets are listed below.

  package org.sparkexample; import org.apache.parquet.filter2.predicate.Operators.Column; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.RelationalGroupedDataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import java.util.Arrays; import java.util.List; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; public class GroupBy { public static void main(String[] args) { System.setProperty("hadoop.home.dir", "C:\\winutils"); JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]")); SQLContext sqlContext = new SQLContext(sc); SparkSession spark = SparkSession.builder().appName("split datasets").getOrCreate(); sc.setLogLevel("ERROR"); Dataset<Row> src= sqlContext.read() .format("com.databricks.spark.csv") .option("header", "true") .load("sample.csv"); Dataset<Row> unq_manf=src.select("Manufacturer").distinct(); List<Row> lsts= unq_manf.collectAsList(); for(Row lst:lsts){ String man=lst.toString(); man = man.replaceAll("[\\p{Ps}\\p{Pe}]", ""); Dataset<Row> DF = src.filter("Manufacturer='"+man+"'"); DF.show(); } } } INPUT TABLE- +------+------------+--------------------+---+ |ItemID|Manufacturer| Category name|UPC| +------+------------+--------------------+---+ | 804| ael|Brush & Broom Han...|123| | 805| ael|Wheel Brush Parts...|124| | 813| ael| Drivers Gloves|125| | 632| west| Pipe Wrenches|126| | 804| bil| Masonry Brushes|127| | 497| west| Power Tools Other|128| | 496| west| Power Tools Other|129| | 495| bil| Hole Saws|130| | 499| bil| Battery Chargers|131| | 497| west| Power Tools Other|132| +------+------------+--------------------+---+ OUTPUT- +------------+ |Manufacturer| +------------+ | ael| | west| | bil| +------------+ +------+------------+--------------------+---+ |ItemID|Manufacturer| Category name|UPC| +------+------------+--------------------+---+ | 804| ael|Brush & Broom Han...|123| | 805| ael|Wheel Brush Parts...|124| | 813| ael| Drivers Gloves|125| +------+------------+--------------------+---+ +------+------------+-----------------+---+ |ItemID|Manufacturer| Category name|UPC| +------+------------+-----------------+---+ | 632| west| Pipe Wrenches|126| | 497| west|Power Tools Other|128| | 496| west|Power Tools Other|129| | 497| west|Power Tools Other|132| +------+------------+-----------------+---+ +------+------------+----------------+---+ |ItemID|Manufacturer| Category name|UPC| +------+------------+----------------+---+ | 804| bil| Masonry Brushes|127| | 495| bil| Hole Saws|130| | 499| bil|Battery Chargers|131| +------+------------+----------------+---+ 

thanks

+5
source share
1 answer

In this case, you have two options:

  • First you need to collect the unique values ​​of the manufacturer, and then display by the resulting array:

     val df = Seq(("HP", 1), ("Brother", 2), ("Canon", 3), ("HP", 5)).toDF("k", "v") val brands = df.select("k").distinct.collect.flatMap(_.toSeq) val BrandArray = brands.map(brand => df.where($"k" <=> brand)) BrandArray.foreach { x => x.show() println("---------------------------------------") } 
  • You can also save the data frame based on the manufacturer.

    df.write.partitionBy("hour").saveAsTable("parquet")

+1
source

Source: https://habr.com/ru/post/1265125/


All Articles