Apache Flink DataStream API API does not have mapPartition conversion

Spark DStream has a mapPartition API, but the Flink DataStream API does not. Is there anyone who could explain the reason. I want to make an API similar to Spark reduceByKey on Flink.

+5
source share
2 answers

The Flink stream processing model is very different from Spark Streaming, which is centered around mini-packages. In Spark Streaming, each mini-batch runs as a normal batch program on a finite data set, while Flink DataStream programs continuously process recordings.

In the Flink DataSet API, a MapPartitionFunction has two parameters. An iterator for input and a collector for the result of the function. A MapPartitionFunction in a Flink DataStream program will never return from the first function call because the iterator will iterate over an endless stream of records. However, the Flink internal flow model requires that user functions return for the state of the checkpoint function. Therefore, the DataStream API does not offer mapPartition conversions.

To implement functionality similar to Spark Streaming reduceByKey , you need to define a window with a key above the stream. Windows discretizes streams that are somewhat similar to mini-parties, but windows offer more flexibility. Since the window has a finite size, you can call reduce window.

It might look like this:

 yourStream.keyBy("myKey") // organize stream by key "myKey" .timeWindow(Time.seconds(5)) // build 5 sec tumbling windows .reduce(new YourReduceFunction); // apply a reduce function on each window 

The DataStream documentation shows how to define different types of windows and explain all the available functions.

Note. The DataStream API has been redesigned recently. The example assumes the latest version (0.10-SNAPSHOT), which will be released as 0.10.0 in the following days.

+5
source

Assuming your input stream is data from a single section (e.g. String)

 val new_number_of_partitions = 4 //below line partitions your data, you can broadcast data to all partitions val step1stream = yourStream.rescale.setParallelism(new_number_of_partitions) //flexibility for mapping val step2stream = step1stream.map(new RichMapFunction[String, (String, Int)]{ // var local_val_to_different_part : Type = null var myTaskId : Int = null //below function is executed once for each mapper function (one mapper per partition) override def open(config: Configuration): Unit = { myTaskId = getRuntimeContext.getIndexOfThisSubtask //do whatever initialization you want to do. read from data sources.. } def map(value: String): (String, Int) = { (value, myTasKId) } }) val step3stream = step2stream.keyBy(0).countWindow(new_number_of_partitions).sum(1).print //Instead of sum(1), you can use .reduce((x,y)=>(x._1,x._2+y._2)) //.countWindow will first wait for a certain number of records for perticular key // and then apply the function 

Flink streaming - pure streaming (not packet). Take a look at the Iterate API.

0
source

Source: https://habr.com/ru/post/1234738/


All Articles