Sharing data streams with Spark

Corrected micro-packet data streaming.

Each data interval is processed in parallel using RDD without any data exchange between each interval.

But my use case should share data between intervals.

Consider the Network WordCount example, which counts all the words received in this interval.

How do I do the following word count?

  • The relative number of words "hadoop" and "spark" with the previous interval

  • The normal number of words for all other words.

Note. UpdateStateByKey does Stateful processing, but it will apply a function to each record, not specific records.

So, UpdateStateByKey is not suitable for this requirement.

Update:

consider the following example

Interval 1

Input:

Sample Input with Hadoop and Spark on Hadoop 

output:

 hadoop 2 sample 1 input 1 with 1 and 1 spark 1 on 1 

Interval 2

Input:

 Another Sample Input with Hadoop and Spark on Hadoop and another hadoop another spark spark 

output:

 another 3 hadoop 1 spark 2 and 2 sample 1 input 1 with 1 on 1 

Explanation:

The 1st interval gives the normal number of words for all words.

In the second interval, hadoop occurred 3 times, but the result should be 1 (3-2)

The spark occurred 3 times, but the output should be 2 (3-1)

For all other words, it should give a normal number of words.

Thus, when processing the data of the second interval, it should have a counter of the 1st interval of the words hadoop and spark

This is a simple example with illustrations.

In actual use, fields that need data sharing are part of the RDD element (RDD), and a huge number of values ​​need to be monitored.

ie, in this example, for example, the words hasoop and spark, which are tracked by nearly 100k keywords.

Similar operations in Apache Storm:

Distributed Storm Caching

Storm Transactional Words

+6
source share
1 answer

This is possible by "remembering" the last received RDD and using the left connection to combine this data with the next stream. We use streamingContext.remember to enable the RDDs created by the streaming process for the time we need them.

We use the fact that dstream.transform is an operation performed on the driver, and therefore we have access to all definitions of local objects. In particular, we want to update the mutable link to the latest RDD with the required value for each batch.

Part of the code probably makes this idea clearer:

 // configure the streaming context to remember the RDDs produced // choose at least 2x the time of the streaming interval ssc.remember(xx Seconds) // Initialize the "currentData" with an empty RDD of the expected type var currentData: RDD[(String, Int)] = sparkContext.emptyRDD // classic word count val w1dstream = dstream.map(elem => (elem,1)) val count = w1dstream.reduceByKey(_ + _) // Here the key to make this work. Look how we update the value of the last RDD after using it. val diffCount = count.transform{ rdd => val interestingKeys = Set("hadoop", "spark") val interesting = rdd.filter{case (k,v) => interestingKeys(k)} val countDiff = rdd.leftOuterJoin(currentData).map{case (k,(v1,v2)) => (k,v1-v2.getOrElse(0))} currentData = interesting countDiff } diffCount.print() 
+8
source

Source: https://habr.com/ru/post/986557/


All Articles