I have an implementation implementing a structured stream like this ...
myDataSet
.map(r => StatementWrapper.Transform(r))
.writeStream
.foreach(MyWrapper.myWriter)
.start()
.awaitTermination()
It all works, but viewing the performance of MyWrapper.myWriter is terrible. It effectively tries to become a JDBC receiver, it looks like this:
val myWriter: ForeachWriter[Seq[String]] = new ForeachWriter[Seq[String]] {
var connection: Connection = _
override def open(partitionId: Long, version: Long): Boolean = {
Try (connection = getRemoteConnection).isSuccess
}
override def process(row: Seq[String]) {
val statement = connection.createStatement()
try {
row.foreach( s => statement.execute(s) )
} catch {
case e: SQLSyntaxErrorException => println(e)
case e: SQLException => println(e)
} finally {
statement.closeOnCompletion()
}
}
override def close(errorOrNull: Throwable) {
connection.close()
}
}
So my question is: has a new ForeachWriter been created for each row? so, for each row in the dataset, the open () and close () functions are called?
Is there a better design for better throughput?
How to parse an SQL statement once and execute many times, also keep the database connection open?
source
share