Can I use Flink state to connect?

I evaluate Apache Flink for thread processing as an Apache Spark replacement / addition. One of the tasks that we usually solve with Spark is data enrichment.

Ie, I have a data stream from IoT sensors with a sensor ID, and I have a set of sensor metadata. I want to convert an input stream to a sensor measurement stream + sensor metadata.

In Spark, I can join DStream with RDD.

case calss SensorValue(sensorId: Long, ...)
case class SensorMetadata(sensorId: Long, ...)
val sensorInput: DStream[SensorValue] = readEventsFromKafka()
val staticMetadata: RDD[(Long, SensorMetadata)] =
  spark.read.json(...).as[SensorMetadata]
 .map {s => (s.sensorId, s)}.rdd
val joined: DStream[(SensorValue, SensorMetadata)] = 
  sensorInput.map{s => (s.sensorId, s)}.transform { rdd: RDD[SensorValue] => 
  rdd.join(staticMetadata)
     .map { case (_, (s, m)) => (s, m) } // Get rid of nested tuple
}

Can I do the trick with Apache Flink? I do not see a direct API for this. The only idea is to use stateful conversion - I can combine metadata and sensor events in one stream and use the Flink state store to store metadata (pseudocode):

val sensorInput: DataStream[SensorValue] = readEventsFromKafka()
val statisMetadata: DataStream[SensorMetadata] = readMetadataFromJson()
val result: DataStream[(SensorValue, SensorMetadata)] =
  sensorInput.keyBy("sensorId")
 .connect(staticMetadata.keyBy("sensorId"))
 .flatMap {new RichCoFlatMapFunction() {
   private val ValueState<SensorMetadata> md = _;
   override def open = ??? // initiate value state
   def flatMap1(s: SensorEvent, s: Collector(SensorEvent, SensorMetadata)) = 
      collector.collect(s, md.value) 
   def flatMap2(s: SensorMetadata, s: Collector[(SensorEvent, SensorMetadata)]) = 
   md.update(s)  
 }}

? , ?

+4
1

CoFlatMapFunction - . . , , , . , . , , . , CoFlatMapFunction , . . , , . , , , ( RocksDB, , ).

, , ( ) , , FlatMapFunction open() . , , . , , , , , .

+3

Source: https://habr.com/ru/post/1658055/


All Articles