How to write JDBC Sink for Spark Structured Streaming [Bug fix: task is not serializable]?

I need a jdbc receiver for a data frame with spark structured streaming. At the moment, as far as I know, the DataFrames API does not have the ability to writeestream to implement jdbc (neither in pyspark nor in scala (current spark version 2.2.0)). The only thing I found was to write my own ForeachWriter scala class based on this article . So, ive changed the simple word counting example from here by adding a custom ForeachWriter class and tried to write to postgress. The word stream is manually generated from the console (using NetCat: nc -lk -p 9999) and is read using a spark from the socket.

Unfortunately, I get the message "Task is not serializable" ERROR.

APACHE_SPARK_VERSION = 2.1.0 Using scala version 2.11.8 (Java HotSpot (TM) server, 64-bit VM server, Java 1.8.0_112)

My scala code:

//Spark context available as 'sc' (master = local[*], app id = local-1501242382770). //Spark session available as 'spark'. import java.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.SparkSession val spark = SparkSession .builder .master("local[*]") .appName("StructuredNetworkWordCountToJDBC") .config("spark.jars", "/tmp/data/postgresql-42.1.1.jar") .getOrCreate() import spark.implicits._ val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() val words = lines.as[String].flatMap(_.split(" ")) val wordCounts = words.groupBy("value").count() class JDBCSink(url: String, user:String, pwd:String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]{ val driver = "org.postgresql.Driver" var connection:java.sql.Connection = _ var statement:java.sql.Statement = _ def open(partitionId: Long, version: Long):Boolean = { Class.forName(driver) connection = java.sql.DriverManager.getConnection(url, user, pwd) statement = connection.createStatement true } def process(value: org.apache.spark.sql.Row): Unit = { statement.executeUpdate("INSERT INTO public.test(col1, col2) " + "VALUES ('" + value(0) + "'," + value(1) + ");") } def close(errorOrNull:Throwable):Unit = { connection.close } } val url="jdbc:postgresql://<mypostgreserver>:<port>/<mydb>" val user="<user name>" val pwd="<pass>" val writer = new JDBCSink(url, user, pwd) import org.apache.spark.sql.streaming.ProcessingTime val query=wordCounts .writeStream .foreach(writer) .outputMode("complete") .trigger(ProcessingTime("25 seconds")) .start() query.awaitTermination() 

Error message:

 ERROR StreamExecution: Query [id = ef2e7a4c-0d64-4cad-ad4f-91d349f8575b, runId = a86902e6-d168-49d1-b7e7-084ce503ea68] terminated with error org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:923) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923) at org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:503) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:502) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:255) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:244) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:239) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177) Caused by: java.io.NotSerializableException: org.apache.spark.sql.execution.streaming.StreamExecution Serialization stack: - object not serializable (class: org.apache.spark.sql.execution.streaming.StreamExecution, value: Streaming Query [id = 9b01db99-9120-4047-b779-2e2e0b289f65, runId = e20beefa-146a-4139-96f9-de3d64ce048a] [state = TERMINATED]) - field (class: $line21.$read$$iw$$iw, name: query, type: interface org.apache.spark.sql.streaming.StreamingQuery) - object (class $line21.$read$$iw$$iw, $line21.$read$$iw$$iw@24747e0f ) - field (class: $line21.$read$$iw, name: $iw, type: class $line21.$read$$iw$$iw) - object (class $line21.$read$$iw, $line21.$read$$iw@1814ed19 ) - field (class: $line21.$read, name: $iw, type: class $line21.$read$$iw) - object (class $line21.$read, $line21.$read@13e62f5d ) - field (class: $line25.$read$$iw, name: $line21$read, type: class $line21.$read) - object (class $line25.$read$$iw, $line25.$read$$iw@14240e5c ) - field (class: $line25.$read$$iw$$iw, name: $outer, type: class $line25.$read$$iw) - object (class $line25.$read$$iw$$iw, $line25.$read$$iw$$iw@11e4c6f5 ) - field (class: $line25.$read$$iw$$iw$JDBCSink, name: $outer, type: class $line25.$read$$iw$$iw) - object (class $line25.$read$$iw$$iw$JDBCSink, $line25.$read$$iw$$iw$JDBCSink@6c096c84 ) - field (class: org.apache.spark.sql.execution.streaming.ForeachSink, name: org$apache$spark$sql$execution$streaming$ForeachSink$$writer, type: class org.apache.spark.sql.ForeachWriter) - object (class org.apache.spark.sql.execution.streaming.ForeachSink, org.apache.spark.sql.execution.streaming.ForeachSink@6feccb75 ) - field (class: org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1, name: $outer, type: class org.apache.spark.sql.execution.streaming.ForeachSink) - object (class org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) ... 25 more 

How to make it work?

Decision

(Thanks to everyone, special thaks for @zsxwing for a simple solution):

  • Save the JDBCSink class in a file.
  • When loading from a spark shell, the class f.eg. using scala> :load <path_to_a_JDBCSink.scala_file>
  • Finally, scala> :paste code without defining a JDBCSink class.
+5
source share
3 answers

Just define JDBCSink in a separate file, and do not define it as an inner class that can capture an external link.

+3
source

It seems that the intruder here uses import spark.implicits._ inside the JDBCSink class:

  • JDBCSink must be serializable
  • By adding this import, you will make the JDBCSink link a non- SparkSession , which is then serialized with it (technically, SparkSession extends Serializable , but it is not intended for deserialization on SparkSession extends Serializable nodes)

The good news is: you are not using this import, so if you just delete it, this should work.

0
source

If someone comes across this in an online book, this solution also works:

Instead of saving the JDBCSink class to a separate file, you can also simply declare it as a separate package ("Packed cell") within the same workbook and import this package into the cell in which you use it. Well described here https://docs.databricks.com/user-guide/notebooks/package-cells.html

0
source

Source: https://habr.com/ru/post/1270323/


All Articles