I am trying to create an example application using Apache Flink that does the following:
- Reads a stream of stock characters (for example, "CSCO", "FB") from the Kafka queue.
- For each symbol, a real-time search is carried out for current prices and value flows for subsequent processing.
* Upgrade to original post *
I moved the map function to a separate class and did not receive an error message at runtime. "The MapFunction implementation is no longer serialized. The object probably contains or refers to non-serializable fields."
The problem that I am currently facing is that the Kafka theme "stockprices" that I am trying to write does not receive them. I am trying to run and post any updates.
public class RetrieveStockPrices { @SuppressWarnings("serial") public static void main(String[] args) throws Exception { final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "stocks"); DataStream<String> streamOfStockSymbols = streamExecEnv.addSource(new FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(), properties)); DataStream<String> stockPrice = streamOfStockSymbols //get unique keys .keyBy(new KeySelector<String, String>() { @Override public String getKey(String trend) throws Exception { return trend; } }) //collect events over a window .window(TumblingEventTimeWindows.of(Time.seconds(60))) //return the last event from the window...all elements are the same "Symbol" .apply(new WindowFunction<String, String, String, TimeWindow>() { @Override public void apply(String key, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception { out.collect(input.iterator().next().toString()); } }) .map(new StockSymbolToPriceMapFunction()); streamExecEnv.execute("Retrieve Stock Prices"); } } public class StockSymbolToPriceMapFunction extends RichMapFunction<String, String> { @Override public String map(String stockSymbol) throws Exception { final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); System.out.println("StockSymbolToPriceMapFunction: stockSymbol: " + stockSymbol); DataStream<String> stockPrices = streamExecEnv.addSource(new LookupStockPrice(stockSymbol)); stockPrices.keyBy(new CustomKeySelector()).addSink(new FlinkKafkaProducer08<String>("localhost:9092", "stockprices", new SimpleStringSchema())); return "100000"; } private static class CustomKeySelector implements KeySelector<String, String> { @Override public String getKey(String arg0) throws Exception { return arg0.trim(); } } } public class LookupStockPrice extends RichSourceFunction<String> { public String stockSymbol = null; public boolean isRunning = true; public LookupStockPrice(String inSymbol) { stockSymbol = inSymbol; } @Override public void open(Configuration parameters) throws Exception { isRunning = true; } @Override public void cancel() { isRunning = false; } @Override public void run(SourceFunction.SourceContext<String> ctx) throws Exception { String stockPrice = "0"; while (isRunning) { //TODO: query Google Finance API stockPrice = Integer.toString((new Random()).nextInt(100)+1); ctx.collect(stockPrice); Thread.sleep(10000); } } }
source share