Spark SQL / Hive Query takes forever with joining

So, I am doing what should be simple, but apparently this is not in Spark SQL.

If I run the following query in MySQL, the query ends for a split second:

SELECT ua.address_id
FROM user u
inner join user_address ua on ua.address_id = u.user_address_id
WHERE u.user_id = 123;

However, executing the same request in HiveContext under Spark (1.5.1) takes more than 13 seconds. Adding more joins makes the query work very very long (more than 10 minutes). I'm not sure what I'm doing wrong here, and how I can speed up the process.

Tables are MySQL tables that are loaded into the Hive context as temporary tables. This runs in a single instance with the database on the remote machine.

  • The user table has about 4.8 million rows.
  • The user_address table contains 350,000 rows.

There are foreign key fields in the tables, but explicit relationships fk are not defined in db. I am using InnoDB.

Spark Execution Plan:

Plan:

Scan JDBCRelation (JDBC: MySQL: // .user, [Lorg.apache.spark.Partition; @ 596f5dfc, {user = , password = , url = jdbc: mysql: // , dbtable = user}) [Address_id # 0L, user_address_id # 27L]

Filter (user_id # 0L = 123) JDBCRelation scan (JDBC: MySQL: // .user_address, [Lorg.apache.spark.Partition; @ 2ce558f3, {user = , password = , url = jdbc: mysql: // , dbtable = user_address}) [address_id # 52L]

ConvertToUnsafe ConvertToUnsafe

TungstenExchange (address_id # 52L) TungstenExchange hashpartitioning (user_address_id # 27L) TungstenSort [address_id # 52L ASC], false, 0 TungstenSort [user_address_id # 27L ASC], false, 0

SortMergeJoin [user_address_id # 27L], [address_id # 52L]

== == TungstenProject [address_id # 0L]

+4
2

, . (Spark 1.5.0 *) , , /- , . users, user_id = 123, , , user_address.

, , user_address MySQL Spark.

, , .

, Spark , , , Spark .

, , Spark SQL . OLTP.

/ , .

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.broadcast

val user: DataFrame = ???
val user_address: DataFrame = ???

val userFiltered = user.where(???)

user_addresses.join(
  broadcast(userFiltered), $"address_id" === $"user_address_id")

* Spark 1.6.0 SPARK-11410, .

+4

(Spark 1.5.1, PostgreSQL 9.4).

,

val t1 = sqlContext.read.format("jdbc").options(
  Map(
    "url" -> "jdbc:postgresql:db",
    "dbtable" -> "t1")).load()

val t2 = sqlContext.read.format("jdbc").options(
  Map(
    "url" -> "jdbc:postgresql:db",
    "dbtable" -> "t2")).load()

HQL ( ).

, RDBMS:

val joined = sqlContext.read.format("jdbc").options(
  Map(
    "url" -> "jdbc:postgresql:db",
    "dbtable" -> "(select t1.*, t2.* from t1 inner join t2 on ...) as t")).load()

, RDBMS , . Spark, , , RDBMS .

+3

Source: https://habr.com/ru/post/1618236/


All Articles