Apache Flink-使用数据流中的值动态创建流数据源 [英] Apache Flink - use values from a data stream to dynamically create a streaming data source

查看:17
本文介绍了Apache Flink-使用数据流中的值动态创建流数据源的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Apache Flink构建执行以下操作的示例应用程序:

  1. 读取股票代码流(例如‘CSCO’,‘fb’)。
  2. 对每个符号执行当前价格的实时查找,并将值流式传输以进行下游处理。

*更新为原始帖子*

我将map函数移到单独的类中,没有收到运行时错误消息"MapFunction的实现不再是可序列化的。该对象可能包含或引用了不可序列化的字段"。

我现在面临的问题是,我试图写价格的卡夫卡主题"股票价格"没有收到它们。我正在试着解决问题,并将发布任何更新。

public class RetrieveStockPrices { 
    @SuppressWarnings("serial") 
    public static void main(String[] args) throws Exception { 
        final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 

        Properties properties = new Properties(); 
        properties.setProperty("bootstrap.servers", "localhost:9092"); 
        properties.setProperty("zookeeper.connect", "localhost:2181"); 
        properties.setProperty("group.id", "stocks"); 

        DataStream<String> streamOfStockSymbols = streamExecEnv.addSource(new FlinkKafkaConsumer08<String>("stocksymbol", new SimpleStringSchema(), properties)); 

        DataStream<String> stockPrice = 
            streamOfStockSymbols 
            //get unique keys 
            .keyBy(new KeySelector<String, String>() { 
                @Override 
                public String getKey(String trend) throws Exception {
                    return trend; 
                }
                }) 
            //collect events over a window 
            .window(TumblingEventTimeWindows.of(Time.seconds(60))) 
            //return the last event from the window...all elements are the same "Symbol" 
            .apply(new WindowFunction<String, String, String, TimeWindow>() {
                @Override 
                public void apply(String key, TimeWindow window, Iterable<String> input, Collector<String> out) throws Exception { 
                    out.collect(input.iterator().next().toString()); 
                }
            })
            .map(new StockSymbolToPriceMapFunction());

        streamExecEnv.execute("Retrieve Stock Prices"); 
    }
}

public class StockSymbolToPriceMapFunction extends RichMapFunction<String, String> {
    @Override
    public String map(String stockSymbol) throws Exception {
        final StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        System.out.println("StockSymbolToPriceMapFunction: stockSymbol: " + stockSymbol);

        DataStream<String> stockPrices = streamExecEnv.addSource(new LookupStockPrice(stockSymbol));
        stockPrices.keyBy(new CustomKeySelector()).addSink(new FlinkKafkaProducer08<String>("localhost:9092", "stockprices", new SimpleStringSchema()));

        return "100000";
    }

    private static class CustomKeySelector implements KeySelector<String, String> {
        @Override
        public String getKey(String arg0) throws Exception {
            return arg0.trim();
        }
    }
}


public class LookupStockPrice extends RichSourceFunction<String> { 
    public String stockSymbol = null; 
    public boolean isRunning = true; 

    public LookupStockPrice(String inSymbol) { 
            stockSymbol = inSymbol; 
    } 

    @Override 
    public void open(Configuration parameters) throws Exception { 
            isRunning = true; 
    } 


    @Override 
    public void cancel() { 
            isRunning = false; 
    } 

    @Override 
    public void run(SourceFunction.SourceContext<String> ctx) 
                    throws Exception { 
            String stockPrice = "0";
            while (isRunning) { 
                //TODO: query Google Finance API 
                stockPrice = Integer.toString((new Random()).nextInt(100)+1);
                ctx.collect(stockPrice);
                Thread.sleep(10000);
            } 
    } 
}

推荐答案

StreamExecutionEnvironment不缩进以在流式应用的运算符内部使用。不是有意的意思,这不是测试和鼓励的。它可能会正常工作并执行某些操作,但很可能会表现不佳,并且可能会终止您的应用程序。

程序中的StockSymbolToPriceMapFunction为每个传入记录指定一个全新且独立的新流应用程序。但是,由于您不调用streamExecEnv.execute(),程序不会启动,并且map方法不执行任何操作而返回。

如果调用streamExecEnv.execute(),该函数将在工作器JVM中启动一个新的本地Flink集群,并在该本地Flink集群上启动应用程序。本地Flink实例将占用大量堆空间,并且在启动几个群集后,工作进程可能会因OutOfMemoryError而死亡,这不是您希望发生的情况。

这篇关于Apache Flink-使用数据流中的值动态创建流数据源的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆