Flink: how to handle external application configuration changes in flink

My requirement is a stream of millions of records per day, and it has a huge dependence on external configuration parameters. For example, the user can go and change the required parameter at any time in the web application, and after making the changes, streaming should occur with the new application configuration parameters. These are application-level configurations, and we also have some dynamic exception parameters that we need to pass and filter each of the data.

I see that flink does not have a global state that is accessible to all task managers and subtasks. Having a centralized cache is an option, but for each parameter I will need to read it from the cache, which will increase the delay. Please advise how best to handle these scenarios and how other applications handle it. Thank.

+4
source share
1 answer

Updating the configuration of a running streaming application is a general requirement. In the Flink DataStream API, this can be done using the so-called CoFlatMapFunction, which processes two input streams. One of the streams may be a data stream, and the other a control stream.

, , , .

val data: DataStream[String] = ???
val control: DataStream[Int] = ???

val filtered: DataStream[String] = data
  // broadcast all control messages to the following CoFlatMap subtasks
  .connect(control.broadcast)
  // process data and control messages
  .flatMap(new DynLengthFilter)


class DynLengthFilter extends CoFlatMapFunction[String, Int, String] with Checkpointed[Int] {

  var length = 0

  // filter strings by length
  override def flatMap1(value: String, out: Collector[String]): Unit = {
    if (value.length < length) {
      out.collect(value)
    }
  }

  // receive new filter length
  override def flatMap2(value: Int, out: Collector[String]): Unit = {
    length = value
  }

  override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): Int = length

  override def restoreState(state: Int): Unit = {
    length = state
  }
}

DynLengthFilter Checkpointed . .

+3

Source: https://habr.com/ru/post/1655774/


All Articles