Periodic Broadcasting in Apache Spark Streaming

I am introducing a student flow to classify text. In my implementation, there are some unambiguous parameters that need to be updated as new flow elements arrive. For example, I want to change the speed of learning, as new forecasts are made. However, I doubt that there is a way to pass variables after the first broadcast. So what happens if I need to translate a variable every time I update it. If there is a way to do this or a workaround for what I want to do in Spark Streaming, I would be happy to hear about it.

Thanks in advance.

+6
source share
5 answers

My understanding is that the broadcast variable is immediately passed, it is read-only. I believe that you can update the broadcast variable on local nodes, but not on remote nodes.

Perhaps you should think about it "outside the spark." How about using the noSQL store (Cassandra..etc) or even Memcache? Then can you update a variable from one task and periodically check this repository from other tasks?

+1
source

I got an ugly game, but it worked! We can find how to get the broadcast value from the broadcast object. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala#L114 just using the broadcast ID.

therefore, I periodically relay through the same translation identifier.

val broadcastFactory = new TorrentBroadcastFactory() broadcastFactory.unbroadcast(BroadcastId, true, true) // append some ids to initIds val broadcastcontent = broadcastFactory.newBroadcast[.Set[String]](initIds, false, BroadcastId) 

and I can get the BroadcastId from the first broadcast value.

 val ids = ssc.sparkContext.broadcast(initIds) // broadcast id val BroadcastId = broadcastIds.id 

then the worker uses identifiers as a type of broadcast transmission, as usual.

 def func(record: Array[Byte], bc: Broadcast[Set[String]]) = ??? 
+1
source
 bkc.unpersist(true) bkc.destroy() bkc = sc.broadcast(tableResultMap) bkv = bkc.value 

You can try it, I can’t guarantee if it’s effective

+1
source

I got this working by creating a wrapper class on a broadcast variable. The updateAndGet method of the wrapper class returns the updated broadcast variable. I call this function inside dStream.transform -> according to the Spark documentation

http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation

Transformation operation : "the incoming function is called in each periodic interval, which allows you to perform time-varying RDD operations, that is, RDD operations, the number of partitions, broadcast variables , etc. can be changed between batches .

The BroadcastWrapper class will look like this:

 public class BroadcastWrapper { private Broadcast<ReferenceData> broadcastVar; private Date lastUpdatedAt = Calendar.getInstance().getTime(); private static BroadcastWrapper obj = new BroadcastWrapper(); private BroadcastWrapper(){} public static BroadcastWrapper getInstance() { return obj; } public JavaSparkContext getSparkContext(SparkContext sc) { JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc); return jsc; } public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){ Date currentDate = Calendar.getInstance().getTime(); long diff = currentDate.getTime()-lastUpdatedAt.getTime(); if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms if (var != null) var.unpersist(); lastUpdatedAt = new Date(System.currentTimeMillis()); //Your logic to refresh ReferenceData data = getRefData(); var = getSparkContext(sparkContext).broadcast(data); } return var; } } 

You can use this updateAndGet function for the broadcast variable in the stream.transform method, which allows you to convert RDD-RDD

 objectStream.transform(stream -> { Broadcast<Object> var = BroadcastWrapper.getInstance().updateAndGet(stream.context()); /**Your code to manipulate stream **/ }); 

See my full answer from this article: fooobar.com/questions/313293 / ...

Hope this helps

+1
source

It is best to collect data in the driver, and then transfer it to all nodes.

Use Dstream # foreachRDD to collect the calculated RDDs in the driver, and once you know when you need to change the learning speed, use SparkContext # broadcast (value) to send the new value to all nodes.

I expect the code to look something like this:

 dStreamContainingBroadcastValue.foreachRDD{ rdd => val valueToBroadcast = rdd.collect() sc.broadcast(valueToBroadcast) } 

You can also find this topic useful from the vibrant user mailing list. Let me know if this works.

0
source

Source: https://habr.com/ru/post/982652/


All Articles