It is very slow for spark connection RDD

I have 2 spark RDDs, dataRDD and newPairDataRDD, which are used for a spark SQL query. when my application is init, dataRDD will be initialized. All data in one specified hbase object will be stored in dataRDD.

When the sql client request arrives, my APP will receive all new updates and insert into newPairDataRDD. data aggregation RDD newPairDataRDD and register as a table in a spark SQL context.

I even found 0 records in dataRDD and 1 new inserted record in newPairDataRDD. Combining takes 4 seconds. It's too slow

I think this is not reasonable. Does anyone know how to do this faster? thanks to the simple code as below

// Step1: load all data from hbase to dataRDD when initial, this only run once. JavaPairRDD<String, Row> dataRDD= getAllBaseDataToJavaRDD(); dataRDD.cache(); dataRDD.persist(StorageLevel.MEMORY_ONLY()); logger.info(dataRDD.count()); // Step2: when spark sql query coming, load latest updated and inserted data from db to newPairDataRDD JavaPairRDD<String, Row> newPairDataRDD = getUpdateOrInstertBaseDataToJavaRDD(); // Step3: if count>0 do union and reduce if(newPairDataRDD.count() > 0) { JavaPairRDD<String, Row> unionedRDD =dataRDD.union(newPairDataRDD); // if data was updated in DB, need to delete the old version from the dataRDD. dataRDD = unionedRDD.reduceByKey( new Function2<Row, Row, Row>() { // @Override public Row call(Row r1, Row r2) { return r2; } }); } //step4: register the dataRDD JavaSchemaRDD schemaRDD = sqlContext.applySchema(dataRDD..values(), schema); //step5: execute sql query retRDD = sqlContext.sql(sql); List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect(); 

From the spark network ui, I see below. Apparently 4s is needed to combine

Completed Steps (8)

StageId Description Presented Duration Tasks: Success / General input in random order Reading a record in random order

6 to collect on SparkPlan.scala: 85 + details 1/4/2015 8:17 2 from August 8, 156.0 B

7 union in SparkSqlQueryForMarsNew.java{8989 +details 1/4/2015 8:17 4 august 8 64.0 B 156.0 B

+5
source share
2 answers

A more efficient way to achieve what you want is to use cogroup() and flatMapValues() , using a union is very small, except for adding new sections to dataRDD , which means that all data must be shuffled to reduceByKey() . A cogroup() and flatMapValues() will only redistribute newPairDataRDD .

 JavaPairRDD<String, Tuple2<List<Row>, List<Row>>> unionedRDD = dataRDD.cogroup(newPairDataRDD); JavaPairRDD<String, Row> updated = unionedRDD.flatMapValues( new Function<Tuple2<List<Row>, List<Row>>, Iterable<Row>>() { public Iterable<Row> call(Tuple2<List<Row>, List<Row>> grouped) { if (grouped._2.nonEmpty()) { return grouped._2; } else { return grouped._1; } } }); 

Or in Scala

 val unioned = dataRDD.cogroup(newPairDataRDD) val updated = unioned.flatMapValues { case (oldVals, newVals) => if (newVals.nonEmpty) newVals else oldVals } 

Disclaimer, I'm not used to writing a spark in Java! Please someone correct me if this is wrong!

+1
source

Try redistributing your RDD:

JavaPairRDD unionedRDD = dataRDD.repartition (sc.defaultParallelism * 3) .union (newPairDataRDD.repartition (sc.defaultParallelism * 3));

0
source

Source: https://habr.com/ru/post/1210375/


All Articles