Redis on Spark: the task is not serializable

We use Redis on Spark to cache our key-value pairs. This is the code:

import com.redis.RedisClient val r = new RedisClient("192.168.1.101", 6379) val perhit = perhitFile.map(x => { val arr = x.split(" ") val readId = arr(0).toInt val refId = arr(1).toInt val start = arr(2).toInt val end = arr(3).toInt val refStr = r.hmget("refStr", refId).get(refId).split(",")(1) val readStr = r.hmget("readStr", readId).get(readId) val realend = if(end > refStr.length - 1) refStr.length - 1 else end val refOneStr = refStr.substring(start, realend) (readStr, refOneStr, refId, start, realend, readId) }) 

But the compiler gave me such feedback:

 Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) at org.apache.spark.rdd.RDD.map(RDD.scala:270) at com.ynu.App$.main(App.scala:511) at com.ynu.App.main(App.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: com.redis.RedisClient at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 12 more 

Can anyone tell me how to serialize data from Redis.Thanks a lot.

+5
source share
2 answers

In Spark, functions on the RDD (e.g. map here) are serialized and sent to the executors for processing. This means that all elements contained in these operations must be serializable.

The Redis connection is not serialized here, because it opens TCP connections to the target database, which are bound to the machine on which it was created.

The solution is to create these connections for executors in the context of local execution. There are several ways to do this. Two that I like are:

  • rdd.mapPartitions : allows you to process the entire section at once and, therefore, amortize the cost of creating connections)
  • Singleton control connections: create a connection once for each artist

mapPartitions simpler because all that is required is a small change in the structure of the program:

 val perhit = perhitFile.mapPartitions{partition => val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation val res = partition.map{ x => ... val refStr = r.hmget(...) // use r to process the local data } r.close // take care of resources res } 

One Singleton connection manager can be modeled with an object that contains a lazy connection reference (note: mutable ref will also work).

 object RedisConnection extends Serializable { lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379) } 

Then this object can be used to create instance 1 of the connection per working JVM and is used as a Serializable object when the operation is closed.

 val perhit = perhitFile.map{x => val param = f(x) val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data } } 

The advantage of using a singleton object is less overhead, since connections are created only once by the JVM (as opposed to 1 per RDD section)

There are some disadvantages:

  • connection clearing is difficult (shutdown / timers)
  • you must ensure the security of shared resource flows.

(*) code provided for illustration. Not compiled or tested.

+15
source

You are trying to serialize a client. You have one RedisClient , r , which you are trying to use inside the map , which will be executed through different nodes of the cluster. Either you get the data from redis separately before completing the cluster task, or create a client separately for each cluster task inside your map block (perhaps using mapPartitions rather than map , as creating a new redis client for each individual line is probably a bad idea).

+2
source

Source: https://habr.com/ru/post/1012532/


All Articles