Flink on Yarn,与 Kafka 并行源 [英] Flink on Yarn, parallel source with Kafka

查看:40
本文介绍了Flink on Yarn,与 Kafka 并行源的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图在我的 Flink 工作中与我的 Kafka 源进行并行处理,但到目前为止我失败了.

I am trying to have parallelism with my Kafka source within my Flink job, but I failed so far.

我为我的 Kafka 制作人设置了 4 个分区:

I set 4 partitions to my Kafka producer :

$ ./bin/kafka-topics.sh --describe --zookeeper X.X.X.X:2181 --topic mytopic
Topic:mytopic   PartitionCount:4    ReplicationFactor:1 Configs:
    Topic: mytopic  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: mytopic  Partition: 1    Leader: 0   Replicas: 0 Isr: 0
    Topic: mytopic  Partition: 2    Leader: 0   Replicas: 0 Isr: 0
    Topic: mytopic  Partition: 3    Leader: 0   Replicas: 0 Isr: 0

我的Scala代码如下:

My scala code is as follow :

    val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(4)
        env.getConfig.setGlobalJobParameters(params)

    // **** Kafka CONNECTION ****
    val properties = new Properties();
    properties.setProperty("bootstrap.servers", params.get("server"));
    properties.setProperty("group.id", "test");


    // **** Get KAFKA source ****
   val stream: DataStream[String] = env.addSource(new FlinkKafkaConsumer010[String](params.get("topic"), new SimpleStringSchema(), properties))

我在 YARN 上运行我的工作:

I run my job on YARN :

$ ./bin/flink run -m yarn-cluster -yn 4 -yjm 8192 -ynm test -ys 1 -ytm 8192 myjar.jar --server X.X.X.X:9092 --topic mytopic

我尝试了很多东西,但我的源代码没有并行化:

I tried a bunch of things, but my source is not parallelized :

拥有多个 Kafka 分区和至少同样多的插槽/任务管理器应该可以,对吗?

Having several Kafka partitions and at least as much slot / Task Managers should do it, right?

推荐答案

我遇到了同样的问题.我建议你检查两件事.

I had the same issue. I would suggest you to check two things.

  1. 当您实施生产者时,请检查您是否为刷新到 Kafka 的每条记录生产相同的密钥.(您应该有一个分布良好的密钥或干脆将其设置为 null).

producer.send(new ProducerRecord("topicName","yourKey","yourMessage")

producer.send(new ProducerRecord("topicName",null,"yourMessage")

  1. 检查您的 Kafka 生产者库版本是否与 Kafka 消费者库版本兼容.

这篇关于Flink on Yarn,与 Kafka 并行源的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆