Sparking and implementation of the connection pool

The following code is mentioned on the sparking website https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams :

dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } } 

I tried to implement this using org.apache.commons.pool2, but the application launch failed with the expected java.io.NotSerializableException:

 15/05/26 08:06:21 ERROR OneForOneStrategy: org.apache.commons.pool2.impl.GenericObjectPool java.io.NotSerializableException: org.apache.commons.pool2.impl.GenericObjectPool at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) ... 

I am wondering how realistic it is to implement a connection pool that can be serialized. Has anyone succeeded in doing this?

Thanks.

+6
source share
2 answers

To solve this problem with a "local resource", you need an identical object, i.e. An object that must be created once and only once in the JVM. Fortunately, the Scala object provides this functionality out of the box.

Secondly, this singleton will provide services for all tasks running on the same JVM where it is located, so it MUST take care of concurrency and resource management.

Try sketching (*) the following service:

 class ManagedSocket(private val pool: ObjectPool, val socket:Socket) { def release() = pool.returnObject(socket) } // singleton object object SocketPool { var hostPortPool:Map[(String, Int),ObjectPool] = Map() sys.addShutdownHook{ hostPortPool.values.foreach{ // terminate each pool } } // factory method def apply(host:String, port:String): ManagedSocket = { val pool = hostPortPool.getOrElse{(host,port), { val p = ??? // create new pool for (host, port) hostPortPool += (host,port) -> p p } new ManagedSocket(pool, pool.borrowObject) } } 

Then use will be:

 val host = ??? val port = ??? stream.foreachRDD { rdd => rdd.foreachPartition { partition => val mSocket = SocketPool(host, port) partition.foreach{elem => val os = mSocket.socket.getOutputStream() // do stuff with os + elem } mSocket.release() } } 

I assume that the GenericObjectPool used in the question takes care of concurrency. Otherwise, access to each pool instance must be secured using some form of synchronization.

(*) The code provided to illustrate the idea of ​​how to create such an object requires additional efforts to be converted to a working version.

+10
source

The answer below is incorrect! I leave the answer here for reference, but the answer is incorrect for the following reason. socketPool declared as lazy val , so it will be created with every first access request. Since the case SocketPool class is not Serializable , this means that it will be created in each section. This makes the connection pool useless because we want to maintain connections between partitions and RDD. It does not matter that this is implemented as a companion object or as a case class. Bottom line: the connection pool must be Serializable , and the apache pool is not.

 import java.io.PrintStream import java.net.Socket import org.apache.commons.pool2.{PooledObject, BasePooledObjectFactory} import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool} import org.apache.spark.streaming.dstream.DStream /** * Publish a Spark stream to a socket. */ class PooledSocketStreamPublisher[T](host: String, port: Int) extends Serializable { lazy val socketPool = SocketPool(host, port) /** * Publish the stream to a socket. */ def publishStream(stream: DStream[T], callback: (T) => String) = { stream.foreachRDD { rdd => rdd.foreachPartition { partition => val socket = socketPool.getSocket val out = new PrintStream(socket.getOutputStream) partition.foreach { event => val text : String = callback(event) out.println(text) out.flush() } out.close() socketPool.returnSocket(socket) } } } } class SocketFactory(host: String, port: Int) extends BasePooledObjectFactory[Socket] { def create(): Socket = { new Socket(host, port) } def wrap(socket: Socket): PooledObject[Socket] = { new DefaultPooledObject[Socket](socket) } } case class SocketPool(host: String, port: Int) { val socketPool = new GenericObjectPool[Socket](new SocketFactory(host, port)) def getSocket: Socket = { socketPool.borrowObject } def returnSocket(socket: Socket) = { socketPool.returnObject(socket) } } 

which you can call as follows:

 val socketStreamPublisher = new PooledSocketStreamPublisher[MyEvent](host = "10.10.30.101", port = 29009) socketStreamPublisher.publishStream(myEventStream, (e: MyEvent) => Json.stringify(Json.toJson(e))) 
+1
source

Source: https://habr.com/ru/post/987902/


All Articles