How to read data from a database in Spark in parellel

I need to read data from a DB2 database using Spark SQL (since Sqoop is missing)

I know about this function that will read data in parellel, opening multiple connections

jdbc(url: String, table: String, columnName: String, lowerBound: Long,upperBound: Long, numPartitions: Int, connectionProperties: Properties)

My problem is that I don't have a column that is incremental like this. Also, I need to read data through Query just because my table is quite large. Does anyone know how to read data through the API, or do I need to create something on my own

+4
source share
3 answers

, table . Spark SQL WHERE. , , , .

val dataframe = sqlContext.read.format("jdbc").option("url", "jdbc:db2://localhost/sparksql").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "table").option("user", "root").option("password", "root").load()
dataframe.registerTempTable("table")
dataframe.sqlContext.sql("select * from table where dummy_flag=1").collect.foreach(println)
+2

Saurabh, , Spark JDBC, numPartitions, .

Spark - , SQL- . , - , max min.

DB2 MPP, , DB2 :

var df = spark.read.
format("jdbc").
option("url", "jdbc:db2://<DB2 server>:<DB2 port>/<dbname>").
option("user", "<username>").
option("password", "<password>").
option("dbtable", "<your table>").
option("partitionColumn", "DBPARTITIONNUM(<a column name>)").
option("lowerBound", "<lowest partition number>").
option("upperBound", "<largest partition number>").
option("numPartitions", "<number of partitions>").
load()

, , DBPARTITIONNUM() .

, DB2 MPP, SQL:

SELECT min(member_number), max(member_number), count(member_number) 
FROM TABLE(SYSPROC.DB_MEMBERS())

, , SQL :

SELECT t2.DBPARTITIONNUM, t3.HOST_NAME
 FROM SYSCAT.TABLESPACES as t1,  SYSCAT.DBPARTITIONGROUPDEF as t2,
      SYSCAT.TABLES t4, TABLE(SYSPROC.DB_MEMBERS()) as t3 
 WHERE t1.TBSPACEID = t4.TBSPACEID AND
       t4.TABSCHEMA='<myschema>' AND
       t4.TABNAME='<mytab>' AND
       t1.DBPGNAME = t2.DBPGNAME AND
       t2.DBPARTITIONNUM = t3.PARTITION_NUMBER;
+1

- , "", (

https://spark.apache.org/docs/2.2.1/api/scala/index.html#org.apache.spark.sql.DataFrameReader@jdbc(url:String,table:String,predicates:Array[String],connectionProperties:java.util.Properties):org.apache.spark.sql.DataFrame

, . Spark , , , , .

, , int, -, , , db (- https://www.ibm.com/support/knowledgecenter/en/SSEPGG_9.7.0/com.ibm.db2.luw.sql.rtn.doc/doc/r0055167.html ). ,

mod (abs (yourhashfunction (yourstringid)), numOfBuckets) + 1 = bucketNumber

, .

, , (.. AND partitiondate = somemeaningfuldate).

Finally, it should be noted that this is usually not as good as the identifier column, because it probably requires a full or wider scan of your target indexes, but it still far surpasses anything else.

0
source

Source: https://habr.com/ru/post/1650967/


All Articles