I got this working by creating a wrapper class on a broadcast variable. The updateAndGet method of the wrapper class returns the updated broadcast variable. I call this function inside dStream.transform -> according to the Spark documentation
http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation
Transformation operation : "the incoming function is called in each periodic interval, which allows you to perform time-varying RDD operations, that is, RDD operations, the number of partitions, broadcast variables , etc. can be changed between batches .
The BroadcastWrapper class will look like this:
public class BroadcastWrapper { private Broadcast<ReferenceData> broadcastVar; private Date lastUpdatedAt = Calendar.getInstance().getTime(); private static BroadcastWrapper obj = new BroadcastWrapper(); private BroadcastWrapper(){} public static BroadcastWrapper getInstance() { return obj; } public JavaSparkContext getSparkContext(SparkContext sc) { JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc); return jsc; } public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){ Date currentDate = Calendar.getInstance().getTime(); long diff = currentDate.getTime()-lastUpdatedAt.getTime(); if (var == null || diff > 60000) {
You can use this updateAndGet function for the broadcast variable in the stream.transform method, which allows you to convert RDD-RDD
objectStream.transform(stream -> { Broadcast<Object> var = BroadcastWrapper.getInstance().updateAndGet(stream.context()); });
See my full answer from this article: fooobar.com/questions/313293 / ...
Hope this helps
source share