You can use the function cluster in spark-sql to create tables, join tables, etc., which acts like a bush to avoid data exchange and sorting in spark2.1 +
. https://issues.apache.org/jira/browse/SPARK-15453
, , , , array
:
val df = (0 until 80000).map(i => (i, i.toString, i.toString)).toDF("item_id", "country", "state").coalesce(1)
", Hive."
df.write.bucketBy(100, "country", "state").sortBy("country", "state").saveAsTable("kofeng.lstg_bucket_test")
17/03/13 15:12:01 WARN HiveExternalCatalog: Persisting bucketed data source table `kofeng`.`lstg_bucket_test` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
df.write.bucketBy(100, "country", "state").sortBy("country", "state").saveAsTable("kofeng.lstg_bucket_test2")
, .
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("Spark SQL basic example").config("spark.sql.autoBroadcastJoinThreshold", "0").getOrCreate()
SPARK 2.1.0, SPARK2.0, .
val query = """
|SELECT *
|FROM
| kofeng.lstg_bucket_test a
|JOIN
| kofeng.lstg_bucket_test2 b
|ON a.country=b.country AND
| a.state=b.state
""".stripMargin
val joinDF = sql(query)
scala> joinDF.queryExecution.executedPlan
res10: org.apache.spark.sql.execution.SparkPlan =
*SortMergeJoin [country
:- *Project [item_id
: +- *Filter (isnotnull(country
: +- *FileScan parquet kofeng.lstg_bucket_test[item_id
+- *Project [item_id
+- *Filter (isnotnull(country
+- *FileScan parquet kofeng.lstg_bucket_test2[item_id