Below is an example of the code that I am running. when this spark job runs, Dataframe connections are made using sortmergejoin instead of broadcastjoin.
def joinedDf (sqlContext: SQLContext, txnTable: DataFrame, countriesDfBroadcast: Broadcast[DataFrame]): DataFrame = { txnTable.as("df1").join((countriesDfBroadcast.value).withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries"), $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner") } joinedDf(sqlContext, txnTable, countriesDfBroadcast).write.parquet("temp")
Transmission does not occur even when I specify the broadcast () hint in the connection statement.
The optimizer hashpartitioning dataframe and causes data corruption.
Has anyone seen this behavior?
I use this yarn using Spark 1.6 and HiveContext as SQLContext. The spark begins with 200 performers. and the txnTable data size is 240 GB, and the data size of the Df countries is 5 MB.
source share