To solve this problem with a "local resource", you need an identical object, i.e. An object that must be created once and only once in the JVM. Fortunately, the Scala object provides this functionality out of the box.
Secondly, this singleton will provide services for all tasks running on the same JVM where it is located, so it MUST take care of concurrency and resource management.
Try sketching (*) the following service:
class ManagedSocket(private val pool: ObjectPool, val socket:Socket) { def release() = pool.returnObject(socket) } // singleton object object SocketPool { var hostPortPool:Map[(String, Int),ObjectPool] = Map() sys.addShutdownHook{ hostPortPool.values.foreach{ // terminate each pool } } // factory method def apply(host:String, port:String): ManagedSocket = { val pool = hostPortPool.getOrElse{(host,port), { val p = ??? // create new pool for (host, port) hostPortPool += (host,port) -> p p } new ManagedSocket(pool, pool.borrowObject) } }
Then use will be:
val host = ??? val port = ??? stream.foreachRDD { rdd => rdd.foreachPartition { partition => val mSocket = SocketPool(host, port) partition.foreach{elem => val os = mSocket.socket.getOutputStream()
I assume that the GenericObjectPool used in the question takes care of concurrency. Otherwise, access to each pool instance must be secured using some form of synchronization.
(*) The code provided to illustrate the idea of how to create such an object requires additional efforts to be converted to a working version.
source share