Flink Kafka - 如何让应用程序并行运行? [英] Flink Kafka - how to make App run in Parallel?

查看:26
本文介绍了Flink Kafka - 如何让应用程序并行运行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在 Flink 中创建一个应用程序

  1. 阅读来自某个主题的消息
  2. 对其进行一些简单的处理
  3. 将结果写入不同的主题

我的代码可以运行,但是它不能并行运行
我该怎么做?
我的代码似乎只在一个线程/块上运行?

在 Flink 网络仪表板上:

  • 应用进入运行状态
  • 但是,概览子任务中只显示了一个块
  • 接收/发送的字节数,接收/发送的记录数始终为零(无更新)

这是我的代码,请帮助我学习如何拆分我的应用程序以便能够并行运行,我是否正确编写了应用程序?

public class SimpleApp {public static void main(String[] args) 抛出异常 {//创建执行环境 INPUT流执行环境 env_in =StreamExecutionEnvironment.getExecutionEnvironment();//事件时间特征env_in.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//生产就绪(如果大于 1,则不起作用)env_in.setParallelism(Integer.parseInt(args[0].toString()));//配置kafka消费者属性 properties = new Properties();properties.setProperty("zookeeper.connect", "localhost:2181");properties.setProperty("bootstrap.servers", "localhost:9092");properties.setProperty("auto.offset.reset", "最早");//创建一个 kafka 消费者最终数据流消费者 = env_in.addSource(new FlinkKafkaConsumer09<>("test", newSimpleStringSchema(), 属性));//过滤数据SingleOutputStreamOperator结果 = 消费者.过滤器(新FilterFunction(){@覆盖公共布尔过滤器(字符串 s)抛出异常 {返回 s.substring(0, 2).contentEquals("PS");}});//处理数据//将字符串记录转换为 JSON 对象SingleOutputStreamOperator数据 = 结果.地图(新MapFunction(){@覆盖公共 JSONObject 映射(字符串值)抛出异常{JSONObject jsnobj = new JSONObject();if(value.substring(0, 2).contentEquals("PS")){//1. 原始数据jsnobj.put("Raw_Data", value.substring(0, value.length()-6));//2. 注释int first_index_comment = value.indexOf("$");int last_index_comment = value.lastIndexOf("$") + 1;//- 设置注释字符串注释 =value.substring(first_index_comment, last_index_comment);comment = comment.substring(0, comment.length()-6);jsnobj.put("评论", 评论);}别的 {jsnobj.put("无效",值);}返回jsnobj;}});//将 JSON 写入 Kafka 主题data.addSink(new FlinkKafkaProducer09("localhost:9092","过滤数据",新的 SimpleJsonSchema()));env_in.execute();}}

我的代码确实有效,但似乎只能在单线程上运行(显示了一个块)在 Web 界面中(没有数据传递,因此发送/接收的字节不会更新).

如何让它并行运行?

解决方案

要并行运行您的工作,您可以做两件事:

  1. 在环境级别增加您的工作的并行度 - 即执行类似

<块引用>

StreamExecutionEnvironment env_in =StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(4);

但这只会在读取数据后增加 flink 端的并行度,因此如果源生成数据的速度更快,它可能无法得到充分利用.

  1. 要完全并行化您的作业,请为您的 kafka 主题设置多个分区,最好是您的 flink 作业所需的并行量.因此,您可能希望在创建 kafka 主题时执行以下操作:

<块引用>

bin/kafka-topics.sh --create --zookeeper 本地主机:2181--replication-factor 3 --partitions 4 --topic 测试

I am creating a app in Flink to

  1. Read Messages from a topic
  2. Do some simple process on it
  3. Write Result to a different topic

My code does work, however it does not run in parallel
How do I do that?
It seems my code runs only on one thread/block?

On the Flink Web Dashboard:

  • App goes to running status
  • But, there is only one block shown in the overview subtasks
  • And Bytes Received / Sent, Records Received / Sent is always zero ( no Update )

Here is my code, please assist me in learning how to split my app to be able to run in parallel, and am I writing the app correctly?

public class SimpleApp {

    public static void main(String[] args) throws Exception {

        // create execution environment INPUT
        StreamExecutionEnvironment env_in  =    
                 StreamExecutionEnvironment.getExecutionEnvironment();
        // event time characteristic
        env_in.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // production Ready (Does NOT Work if greater than 1)
        env_in.setParallelism(Integer.parseInt(args[0].toString()));

        // configure kafka consumer
        Properties properties = new Properties();
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("auto.offset.reset", "earliest");

        // create a kafka consumer
        final DataStream<String> consumer = env_in
                .addSource(new FlinkKafkaConsumer09<>("test", new   
                            SimpleStringSchema(), properties));

        // filter data
        SingleOutputStreamOperator<String> result = consumer.filter(new  
            FilterFunction<String>(){
            @Override
            public boolean filter(String s) throws Exception {
                return s.substring(0, 2).contentEquals("PS");
            }
        });

        // Process Data
        // Transform String Records to JSON Objects
        SingleOutputStreamOperator<JSONObject> data = result.map(new 
                MapFunction<String, JSONObject>()
        {
            @Override
            public JSONObject map(String value) throws Exception
            {
                JSONObject jsnobj = new JSONObject();

                if(value.substring(0, 2).contentEquals("PS"))
                {
                    // 1. Raw Data
                    jsnobj.put("Raw_Data", value.substring(0, value.length()-6));

                    // 2. Comment
                    int first_index_comment = value.indexOf("$");
                    int last_index_comment  = value.lastIndexOf("$") + 1;
                    //   - set comment
                    String comment          =  
                    value.substring(first_index_comment, last_index_comment);
                    comment = comment.substring(0, comment.length()-6);
                    jsnobj.put("Comment", comment);
                }
                else {
                    jsnobj.put("INVALID", value);
                }

                return jsnobj;
            }
        });

        // Write JSON to Kafka Topic
        data.addSink(new FlinkKafkaProducer09<JSONObject>("localhost:9092",
                "FilteredData",
                new SimpleJsonSchema()));

        env_in.execute();
    }
}

My code does work, but it seems to run only on a single thread ( One block shown ) in web interface ( No passing of data, hence the bytes sent / received are not updated ).

How do I make it run in parallel ?

解决方案

To run your job in parallel you can do 2 things:

  1. Increase the parallelism of your job at the env level - i.e. do something like

StreamExecutionEnvironment env_in = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(4);

But this would only increase parallelism at flink end after it reads the data, so if the source is producing data faster it might not be fully utilized.

  1. To fully parallelize your job, setup multiple partitions for your kafka topic, ideally the amount of parallelism you would want with your flink job. So, you might want to do something like below when you are creating your kafka topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 4 --topic test

这篇关于Flink Kafka - 如何让应用程序并行运行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆