Sparksql.sql.codegen gives no improvement

I execute the query in spark sql as shown below. Table data is stored in two different nodes in hive tables.

But since the request is a bit slow, I am trying to find some options in spark mode so that the request can be executed faster. Therefore, I have found that we can customize sparksql.sql.codegenand spark.sql.inMemoryColumnarStorage.compressedto true instead of the default false.

But I have no improvements, the request with these two parameters in true is 4.1 minutes to complete. With these parameters, false also takes 4.1 minutes.

Do you understand why this option does not work?

   query = hiveContext.sql("""select
        l_returnflag,
        l_linestatus,
        sum(l_quantity) as sum_qty,
        sum(l_extendedprice) as sum_base_price,
        sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
        sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
        avg(l_quantity) as avg_qty,
        avg(l_extendedprice) as avg_price,
        avg(l_discount) as avg_disc,
        count(*) as count_order
    from
        lineitem
    where
        l_shipdate <= '1998-09-16'
    group by
        l_returnflag,
        l_linestatus
    order by
        l_returnflag,
        l_linestatus""");

query.collect();
+1
source share
1 answer

spark.sql.codegen.wholeStage 2.0. .

spark.sql.codegen ( Spark 1.3+) false. , , DF.explain / debug

, pls. , 2+, .

i.e 1.3 1.4+, DataFrame , , hiveContext.


  • , Dataset [Row], DataFrame , .

.

- , , , .

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import spark.implicits._
import spark.sql


// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = "file:${system:user.dir}/spark-warehouse"

val spark = SparkSession
  .builder()
  .appName("Spark Hive Aggregations")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()


val df : DataFrame = sql(("""select l_returnflag, l_linestatus,l_quantity,l_extendedprice,l_quantity ,l_extendedprice,l_quantity, l_extendedprice, l_discount from
        lineitem where l_shipdate <= '1998-09-16""");

// can use spark udf or when(cond, evaluation), instead of direct expression
 val df1 =  df.withColumn("sum_disc_price", df.col("l_extendedprice") * (1 - df.col("l_discount"))
          .withColumn("sum_charge", df.col("l_extendedprice") * (1 + df.col("l_tax"))

//NOW SUM, AVG and group by  on dataframe
val groupeddf = df1.groupBy(
  df1.col("returnflag")
, df1.col("l_linestatus")
.agg(
      avg(df1.col("l_quantity")),
    , avg(df1.col("l_extendedprice"))
    , avg(df1.col("l_discount"))
    , sum(df1.col("l_quantity"))
    , sum(df1.col("l_extendedprice"))
    , sum(df1.col("sum_disc_price"))
    , sum(df1.col("sum_charge"))
    , count(df1.col("l_linestatus").as("cnt")
    ) //end agg
    ) //end group by 
//order by on dataframe  
.orderBy("l_returnflag"))
.sort("l_linestatus")
val finalDF = groupeddf.select("l_returnflag","l_linestatus",............. etc);
  • , , / ..
+1

Source: https://habr.com/ru/post/1660543/


All Articles