In Spark, functions on the RDD
(e.g. map
here) are serialized and sent to the executors for processing. This means that all elements contained in these operations must be serializable.
The Redis connection is not serialized here, because it opens TCP connections to the target database, which are bound to the machine on which it was created.
The solution is to create these connections for executors in the context of local execution. There are several ways to do this. Two that I like are:
rdd.mapPartitions
: allows you to process the entire section at once and, therefore, amortize the cost of creating connections)- Singleton control connections: create a connection once for each artist
mapPartitions
simpler because all that is required is a small change in the structure of the program:
val perhit = perhitFile.mapPartitions{partition => val r = new RedisClient("192.168.1.101", 6379)
One Singleton connection manager can be modeled with an object that contains a lazy connection reference (note: mutable ref will also work).
object RedisConnection extends Serializable { lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379) }
Then this object can be used to create instance 1 of the connection per working JVM and is used as a Serializable
object when the operation is closed.
val perhit = perhitFile.map{x => val param = f(x) val refStr = RedisConnection.conn.hmget(...)
The advantage of using a singleton object is less overhead, since connections are created only once by the JVM (as opposed to 1 per RDD section)
There are some disadvantages:
- connection clearing is difficult (shutdown / timers)
- you must ensure the security of shared resource flows.
(*) code provided for illustration. Not compiled or tested.
maasg source share