Apache Flink - use values ​​from a data stream to dynamically create a stream data source

I am trying to create an example application using Apache Flink that does the following:

  • Reads a stream of stock characters (for example, "CSCO", "FB") from the Kafka queue.
  • For each symbol, a real-time search is carried out for current prices and value flows for subsequent processing.

* Upgrade to original post *

I moved the map function to a separate class and did not receive an error message at runtime. "The MapFunction implementation is no longer serialized. The object probably contains or refers to non-serializable fields."

The problem that I am currently facing is that the Kafka theme "stockprices" that I am trying to write does not receive them. I am trying to run and post any updates.

public class RetrieveStockPrices { @SuppressWarnings("serial") public static void main(String[] args) throws Exception { final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "stocks"); DataStream<String> streamOfStockSymbols = streamExecEnv.addSource(new FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(), properties)); DataStream<String> stockPrice = streamOfStockSymbols //get unique keys .keyBy(new KeySelector<String, String>() { @Override public String getKey(String trend) throws Exception { return trend; } }) //collect events over a window .window(TumblingEventTimeWindows.of(Time.seconds(60))) //return the last event from the window...all elements are the same "Symbol" .apply(new WindowFunction<String, String, String, TimeWindow>() { @Override public void apply(String key, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception { out.collect(input.iterator().next().toString()); } }) .map(new StockSymbolToPriceMapFunction()); streamExecEnv.execute("Retrieve Stock Prices"); } } public class StockSymbolToPriceMapFunction extends RichMapFunction<String, String> { @Override public String map(String stockSymbol) throws Exception { final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); System.out.println("StockSymbolToPriceMapFunction: stockSymbol: " + stockSymbol); DataStream<String> stockPrices = streamExecEnv.addSource(new LookupStockPrice(stockSymbol)); stockPrices.keyBy(new CustomKeySelector()).addSink(new FlinkKafkaProducer08<String>("localhost:9092", "stockprices", new SimpleStringSchema())); return "100000"; } private static class CustomKeySelector implements KeySelector<String, String> { @Override public String getKey(String arg0) throws Exception { return arg0.trim(); } } } public class LookupStockPrice extends RichSourceFunction<String> { public String stockSymbol = null; public boolean isRunning = true; public LookupStockPrice(String inSymbol) { stockSymbol = inSymbol; } @Override public void open(Configuration parameters) throws Exception { isRunning = true; } @Override public void cancel() { isRunning = false; } @Override public void run(SourceFunction.SourceContext<String> ctx) throws Exception { String stockPrice = "0"; while (isRunning) { //TODO: query Google Finance API stockPrice = Integer.toString((new Random()).nextInt(100)+1); ctx.collect(stockPrice); Thread.sleep(10000); } } } 
+5
source share
1 answer

StreamExecutionEnvironment not StreamExecutionEnvironment for use within stream application statements. Not intended means, it is not tested and is not encouraged. It may work and do something, but most likely it will not behave well and will probably kill your application.

StockSymbolToPriceMapFunction in your program defines for each incoming record a completely new and independent new streaming application. However, since you do not call streamExecEnv.execute() , programs do not start, and the map method returns without any action.

If you call streamExecEnv.execute() , the function will launch a new local Flink cluster in the JVM-workers and run the application in this local Flink cluster. There will be plenty of heap space on the local Flink instance, and after several clusters are started, the worker is likely to die due to OutOfMemoryError , which is not what you want.

+4
source

Source: https://habr.com/ru/post/1261460/


All Articles