I have a spark (1.2.1 v) that inserts the contents of rdd into postgres using postgresql.Driver for scala
rdd.foreachPartition(iter => {
val driver = "org.postgresql.Driver"
var connection:Connection = null
Class.forName(driver)
connection = DriverManager.getConnection(url, username, password)
val statement = connection.createStatement()
iter.foreach(row => {
val mapRequest = Utils.getInsertMap(row)
val query = Utils.getInsertRequest(squares_table, mapRequest)
try { statement.execute(query) }
catch {
case pe: PSQLException => println("exception caught: " + pe);
}
})
connection.close()
})
In the above code, I open a new connection for postgres for each rdd section and close it. I think the correct way would be to use the connection pool for postgres, from which I can take the connections (as described here ), but its just pseudo-code:
rdd.foreachPartition { partitionOfRecords =>
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection)
}
What is the correct way to connect to postgres with a spark connection pool?
source
share