I use Spark Streaming to process imported data. Import data is saved in DStream . In addition, I have Creation and Update classes that contain a Foo object.
One of the tasks I want to accomplish is change detection. Therefore, I join 2 rdds (one holds a data packet that is being processed, the other has the current state) currentState initially empty.
val stream: DStream[Foo] val currentState: RDD[Foo] val changes = stream .transform { batch => batch.leftouterJoin(currentState) { case(objectNew, Some(objectOld)) => Update(objectNew) case(objectNew, None) => Creation(objectNew) } } currentState = currentState.fullOuterJoin(changes).map { case (Some(foo), None) => foo case (_, Some(change)) => change.foo } }.cache()
Then I filter out the updates.
changes.filter(!_.isInstanceOf[Update])
Now I import the same data twice. Since the state is initially empty, the result set of the first import consists of Creation objects, and the second - only Update objects. So the second result set of changes empty. In this case, I notice a significant decrease in performance. It works fine if I don't leave the filter.
I canβt imagine that this is the intended behavior, but maybe this is a problem with Spark Computation's internal calculations. Can anyone explain why this is happening?
source share