Spark SQL Performance

My code algorithm as shown below Step1 . get hbase entity data for hBaseRDD

JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = jsc.newAPIHadoopRDD(hbase_conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); 

Step2 . convert hBaseRDD to rowPairRDD

  // in the rowPairRDD the key is hbase row key, The Row is the hbase Row data JavaPairRDD<String, Row> rowPairRDD = hBaseRDD .mapToPair(***); dataRDD.repartition(500); dataRDD.cache(); 

Step3 . convert rowPairRDD to schemaRDD

  JavaSchemaRDD schemaRDD = sqlContext.applySchema(rowPairRDD.values(), schema); schemaRDD.registerTempTable("testentity"); sqlContext.sqlContext().cacheTable("testentity"); 

Step4 . using spark sql makes the first simple SQL query.

  JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(jsc); JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE column3 = 'value1' ") List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect(); 

Step5 . use the sql spark by executing a second simple sql query.

 JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE column3 = 'value2' ") List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect(); 

Step 6. use the sql spark by making the third simple SQL query.

 JavaSchemaRDD retRDD=sqlContext.sql("SELECT column1, column2 FROM testentity WHERE column3 = 'value3' "); List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect(); 

Test result as shown below:

Test Case1 :

When I insert 300,000 entries, an hbase object, then run the code.

  • first request requires 60407 ms
  • the second request requires 838 ms
  • 3td request requires 792 ms

If I use hbase Api to execute a similar request, it takes only 2000 ms. Apparently the last 2-bit sql request is much faster than the hbase apb request.
I believe the first spark sql query spends a lot of time loading data from hbase.
Thus, the first request is much slower than the last 2 requests. I think the result is expected.

Test Case2 :

When I insert 400,000 records. hbase object, then run the code.

  • 1st request required 87213 ms
  • the second request requires 83,238 ms
  • 3td request requires 82092 ms

If I use hbase Api to execute a similar request, it takes only 3500 ms. Apparently 3 sql spark requests are much slower than an hbase apb request.
And the last 2 sql intrinsic safety queries are also very slow, and the performance is similar to the first query, why? How to tune performance?

+6
source share
2 answers

I suspect that you are trying to cache more data than you allocated for your Spark instance. I will try to break what happens in every execution of the same request.

First of all, everything in Sparks is lazy. This means that when you call rdd.cache() , nothing happens until you do something with RDD.

First request

  • Full HBase Scan (Slow)
  • Increase the number of partitions (causes shuffling, slow)
  • Data is actually cached into memory because Spark is lazy (sort of slow)
  • Apply where predicate (fast)
  • Results collected

Second / Third Request

  • Full scan in memory (fast)
  • Apply where predicate (fast)
  • Results collected

Spark will now try to cache as many RDDs as possible. If he cannot cache all of this, you may run into serious slowdowns. This is especially true if one of the steps before caching causes a shuffle. You can repeat steps 1 to 3 in the first query for each subsequent query. This is not perfect.

To find out if you are not fully using RDD caching, go to your Spark Web UI ( http://localhost:4040 , if in local offline mode) and find information about saving / saving RDD. Make sure it is 100%.

Edit (in the comments):

400,000 the size of the data in my database is only about 250 MB. Why do I need to use 2G to fix the problem (but 1G β†’ 250 MB)

I can’t say for sure why you reached the maximum limit with spark.executor.memory=1G , but I will add even more relevant caching information.

  • Spark only allocates a percentage of the heap performer memory for caching. By default, this is spark.storage.memoryFraction=0.6 or 60%. That way you really only get 1GB * 0.6 .
  • The total space used in HBase is likely to be different from the total heap space when caching in Spark. By default, Spark does not serialize Java objects when stored in memory. Because of this, there is a fair amount of overhead when storing Java Object metadata. You can change the default stability level .

Do you know how to cache all data to avoid poor performance for the first request?

Calling any action will result in RDD caching. Just do it

 scala> rdd.cache scala> rdd.count 

Now it is cached.

+2
source

I hope that you execute these queries one by one alone, if so, why do you create a separate sqlContext for each query? You can also try redoing RDD, which will increase parallelism. Also, if possible, cache RDD.

We hope that the above steps will improve performance.

+1
source

Source: https://habr.com/ru/post/980148/


All Articles