Flink Kafka-如何使App并行运行? [英] Flink Kafka - how to make App run in Parallel?
问题描述
我正在使用Flink创建一个应用程序
I am creating a app in Flink to
- 阅读主题中的消息
- 对其进行一些简单的处理
- 将结果写到另一个主题
我的代码可以工作,但是它 不能并行运行
我该怎么办?
看来我的代码仅在一个线程/块上运行?
My code does work, however it does not run in parallel
How do I do that?
It seems my code runs only on one thread/block?
在Flink Web仪表板上:
On the Flink Web Dashboard:
- 应用程序进入运行状态
- 但是,概述子任务中仅显示一个块
- 接收/发送的字节数,接收/发送的记录始终为零(无更新)
这是我的代码,请协助我学习如何拆分我的应用程序以使其能够并行运行,我是否正确编写了该应用程序?
Here is my code, please assist me in learning how to split my app to be able to run in parallel, and am I writing the app correctly?
public class SimpleApp {
public static void main(String[] args) throws Exception {
// create execution environment INPUT
StreamExecutionEnvironment env_in =
StreamExecutionEnvironment.getExecutionEnvironment();
// event time characteristic
env_in.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// production Ready (Does NOT Work if greater than 1)
env_in.setParallelism(Integer.parseInt(args[0].toString()));
// configure kafka consumer
Properties properties = new Properties();
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("auto.offset.reset", "earliest");
// create a kafka consumer
final DataStream<String> consumer = env_in
.addSource(new FlinkKafkaConsumer09<>("test", new
SimpleStringSchema(), properties));
// filter data
SingleOutputStreamOperator<String> result = consumer.filter(new
FilterFunction<String>(){
@Override
public boolean filter(String s) throws Exception {
return s.substring(0, 2).contentEquals("PS");
}
});
// Process Data
// Transform String Records to JSON Objects
SingleOutputStreamOperator<JSONObject> data = result.map(new
MapFunction<String, JSONObject>()
{
@Override
public JSONObject map(String value) throws Exception
{
JSONObject jsnobj = new JSONObject();
if(value.substring(0, 2).contentEquals("PS"))
{
// 1. Raw Data
jsnobj.put("Raw_Data", value.substring(0, value.length()-6));
// 2. Comment
int first_index_comment = value.indexOf("$");
int last_index_comment = value.lastIndexOf("$") + 1;
// - set comment
String comment =
value.substring(first_index_comment, last_index_comment);
comment = comment.substring(0, comment.length()-6);
jsnobj.put("Comment", comment);
}
else {
jsnobj.put("INVALID", value);
}
return jsnobj;
}
});
// Write JSON to Kafka Topic
data.addSink(new FlinkKafkaProducer09<JSONObject>("localhost:9092",
"FilteredData",
new SimpleJsonSchema()));
env_in.execute();
}
}
我的代码确实有效,但是它似乎只能在单个线程上运行 (显示一个块)在Web界面中(没有数据传递,因此发送/接收的字节不会更新).
My code does work, but it seems to run only on a single thread ( One block shown ) in web interface ( No passing of data, hence the bytes sent / received are not updated ).
如何使其并行运行?
推荐答案
要并行运行作业,您可以做两件事:
To run your job in parallel you can do 2 things:
- 在环境级别上增加工作的并行度-即执行 之类的操作
StreamExecutionEnvironment env_in = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(4);
StreamExecutionEnvironment env_in = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(4);
但这只会增加flink读取数据后的并行度,因此,如果源更快地生成数据,则可能无法充分利用它.
But this would only increase parallelism at flink end after it reads the data, so if the source is producing data faster it might not be fully utilized.
- 要完全并行化您的作业,请为您的kafka主题设置多个分区,最好是flink作业需要的并行度.因此,在创建kafka主题时,您可能需要执行以下操作:
bin/kafka-topics.sh --create --zookeeper本地主机:2181 -复制因子3-分区4-主题测试
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 4 --topic test
这篇关于Flink Kafka-如何使App并行运行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!