spark.sql.codegen.wholeStage 2.0. .
spark.sql.codegen ( Spark 1.3+) false. , , DF.explain / debug
, pls. , 2+, .
i.e 1.3 1.4+, DataFrame , , hiveContext.
- , Dataset [Row], DataFrame , .
.
- , , , .
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import spark.implicits._
import spark.sql
// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
val spark = SparkSession
.builder()
.appName("Spark Hive Aggregations")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
val df : DataFrame = sql(("""select l_returnflag, l_linestatus,l_quantity,l_extendedprice,l_quantity ,l_extendedprice,l_quantity, l_extendedprice, l_discount from
lineitem where l_shipdate <= '1998-09-16""");
// can use spark udf or when(cond, evaluation), instead of direct expression
val df1 = df.withColumn("sum_disc_price", df.col("l_extendedprice") * (1 - df.col("l_discount"))
.withColumn("sum_charge", df.col("l_extendedprice") * (1 + df.col("l_tax"))
//NOW SUM, AVG and group by on dataframe
val groupeddf = df1.groupBy(
df1.col("returnflag")
, df1.col("l_linestatus")
.agg(
avg(df1.col("l_quantity")),
, avg(df1.col("l_extendedprice"))
, avg(df1.col("l_discount"))
, sum(df1.col("l_quantity"))
, sum(df1.col("l_extendedprice"))
, sum(df1.col("sum_disc_price"))
, sum(df1.col("sum_charge"))
, count(df1.col("l_linestatus").as("cnt")
) //end agg
) //end group by
//order by on dataframe
.orderBy("l_returnflag"))
.sort("l_linestatus")
val finalDF = groupeddf.select("l_returnflag","l_linestatus",............. etc);