Flink Kafka-如何使App并行运行? [英] Flink Kafka - how to make App run in Parallel?

查看:115
本文介绍了Flink Kafka-如何使App并行运行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Flink创建一个应用程序

I am creating a app in Flink to

  1. 阅读主题中的消息
  2. 对其进行一些简单的处理
  3. 将结果写到另一个主题

我的代码可以工作,但是它 不能并行运行
我该怎么办?
看来我的代码仅在一个线程/块上运行?

My code does work, however it does not run in parallel
How do I do that?
It seems my code runs only on one thread/block?

在Flink Web仪表板上:

On the Flink Web Dashboard:

  • 应用程序进入运行状态
  • 但是,概述子任务中仅显示一个块
  • 接收/发送的字节数,接收/发送的记录始终为零(无更新)

这是我的代码,请协助我学习如何拆分我的应用程序以使其能够并行运行,我是否正确编写了该应用程序?

Here is my code, please assist me in learning how to split my app to be able to run in parallel, and am I writing the app correctly?

public class SimpleApp {

    public static void main(String[] args) throws Exception {

        // create execution environment INPUT
        StreamExecutionEnvironment env_in  =    
                 StreamExecutionEnvironment.getExecutionEnvironment();
        // event time characteristic
        env_in.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // production Ready (Does NOT Work if greater than 1)
        env_in.setParallelism(Integer.parseInt(args[0].toString()));

        // configure kafka consumer
        Properties properties = new Properties();
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("auto.offset.reset", "earliest");

        // create a kafka consumer
        final DataStream<String> consumer = env_in
                .addSource(new FlinkKafkaConsumer09<>("test", new   
                            SimpleStringSchema(), properties));

        // filter data
        SingleOutputStreamOperator<String> result = consumer.filter(new  
            FilterFunction<String>(){
            @Override
            public boolean filter(String s) throws Exception {
                return s.substring(0, 2).contentEquals("PS");
            }
        });

        // Process Data
        // Transform String Records to JSON Objects
        SingleOutputStreamOperator<JSONObject> data = result.map(new 
                MapFunction<String, JSONObject>()
        {
            @Override
            public JSONObject map(String value) throws Exception
            {
                JSONObject jsnobj = new JSONObject();

                if(value.substring(0, 2).contentEquals("PS"))
                {
                    // 1. Raw Data
                    jsnobj.put("Raw_Data", value.substring(0, value.length()-6));

                    // 2. Comment
                    int first_index_comment = value.indexOf("$");
                    int last_index_comment  = value.lastIndexOf("$") + 1;
                    //   - set comment
                    String comment          =  
                    value.substring(first_index_comment, last_index_comment);
                    comment = comment.substring(0, comment.length()-6);
                    jsnobj.put("Comment", comment);
                }
                else {
                    jsnobj.put("INVALID", value);
                }

                return jsnobj;
            }
        });

        // Write JSON to Kafka Topic
        data.addSink(new FlinkKafkaProducer09<JSONObject>("localhost:9092",
                "FilteredData",
                new SimpleJsonSchema()));

        env_in.execute();
    }
}

我的代码确实有效,但是它似乎只能在单个线程上运行 (显示一个块)在Web界面中(没有数据传递,因此发送/接收的字节不会更新).

My code does work, but it seems to run only on a single thread ( One block shown ) in web interface ( No passing of data, hence the bytes sent / received are not updated ).

如何使其并行运行?

推荐答案

要并行运行作业,您可以做两件事:

To run your job in parallel you can do 2 things:

  1. 在环境级别上增加工作的并行度-即执行
  2. 之类的操作

StreamExecutionEnvironment env_in = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(4);

StreamExecutionEnvironment env_in = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(4);

但这只会增加flink读取数据后的并行度,因此,如果源更快地生成数据,则可能无法充分利用它.

But this would only increase parallelism at flink end after it reads the data, so if the source is producing data faster it might not be fully utilized.

  1. 要完全并行化您的作业,请为您的kafka主题设置多个分区,最好是flink作业需要的并行度.因此,在创建kafka主题时,您可能需要执行以下操作:

bin/kafka-topics.sh --create --zookeeper本地主机:2181 -复制因子3-分区4-主题测试

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 4 --topic test

这篇关于Flink Kafka-如何使App并行运行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆