How to use subquery for dbtable option in jdbc data source?

I want to use Spark to process some data from a JDBC source. But first, instead of reading the source tables from JDBC, I want to run some queries from the JDBC side to filter the columns and join the tables and load the result of the query as a table into Spark SQL.

The following syntax for loading the original JDBC table works for me:

df_table1 = sqlContext.read.format('jdbc').options(
    url="jdbc:mysql://foo.com:3306",
    dbtable="mydb.table1",
    user="me",
    password="******",
    driver="com.mysql.jdbc.Driver" # mysql JDBC driver 5.1.41
).load() 
df_table1.show() # succeeded

According to the Spark documentation (I am using PySpark 1.6.3):

dbtable: The JDBC table to be read. Note that you can use everything that is valid in the FROM clause of the SQL query. For example, instead of a full table, you can also use a subquery in parentheses.

Therefore, just for the experiment, I tried something simple:

df_table1 = sqlContext.read.format('jdbc').options(
    url="jdbc:mysql://foo.com:3306",
    dbtable="(SELECT * FROM mydb.table1) AS table1",
    user="me",
    password="******",
    driver="com.mysql.jdbc.Driver"
).load() # failed

:

com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'table1 WHERE 1=0' at line 1

(/ , "", ..) . , ? ? , "WHERE 1 = 0" ? !

+8
4

JDBC SQL- Spark SQL - :

val df_table1 = sqlContext.read.format("jdbc").options(Map(
    ("url" -> "jdbc:postgresql://localhost:5432/mydb"),
    ("dbtable" -> "(select * from table1) as table1"),
    ("user" -> "me"),
    ("password" -> "******"),
    ("driver" -> "org.postgresql.Driver"))
).load()

PostgreSQL. MySQL.

+3

, Spark SQL.

, this . Scala , table dbtable.

s"SELECT * FROM $table WHERE 1=0"

table1 WHERE 1=0 , , :

SELECT * FROM (select * from table1) as table1 WHERE 1=0

.

, MySQL - MySQLDialect - getTableExistsQuery :

override def getTableExistsQuery(table: String): String = {
  s"SELECT 1 FROM $table LIMIT 1"
}

, getSchemaQuery . , Spark 1.6.3, @Since("2.1.0").

MySQL , , .

+2

Spark 2.2 Python MySQL (5.7.19) , table="(SELECT * FROM a_table) AS my_table".

from pyspark.sql import SparkSession

my_spark = SparkSession \
    .builder.appName("myApp") \
    .config("jars", "/usr/local/spark-2.2.2-bin-hadoop2.7/jars/mysql-connector-java-5.1.45-bin.jar") \
    .getOrCreate()

my_df = my_spark.read.jdbc(
    url="jdbc:mysql://my_host:3306/my_db",
    table="(SELECT * FROM a_table) AS my_table",
    properties={'user': 'my_username', 'password': 'my_password'}
)

my_df.head(20)
+2
source
table = "(SELECT id, person, manager, CAST(tdate AS CHAR) AS tdate, CAST(start AS   CHAR) AS start, CAST(end AS CHAR) as end, CAST(duration AS CHAR) AS duration FROM EmployeeTimes) AS EmployeeTimes",

spark = get_spark_session()
df = spark.read.format("jdbc"). \
    options(url=ip,
            driver='com.mysql.jdbc.Driver',
            dbtable=table,
            user=username,
            password=password).load()
return df

I had a lot of problems with Spark JDBC incompatibility with MYSQL timestamps. The trick is to convert all of your timestamps or duration values ​​to a string before JDBC touches them. Just give your values ​​as strings and this will work.

Note. You will also need to use AS to give the request an alias so that it works.

+2
source

Source: https://habr.com/ru/post/1673838/


All Articles