Spark Streaming Direct Kafka使用者在执行者之间分布不均 [英] Spark Streaming Direct Kafka Consumers are not evenly distrubuted across executors

查看:74
本文介绍了Spark Streaming Direct Kafka使用者在执行者之间分布不均的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Spark中创建了一个示例Direct Kafka Stream.Kafka有30个给定主题的分区.但是所有使用者都在同一执行器机器上执行.

I have created one sample Direct Kafka Stream in Spark. Kafka has 30 partitions of given topic. But all consumers are executing from same executor machine.

Kafka Manager屏幕截图.

Kafka Manager screenshot.

根据我对直接Kafka Stream的理解,Driver将此偏移量提供给执行者和民意测验.

As per my understanding in direct Kafka Stream, Driver gives the offsets to executors and polls with this.

火花版本:2.4

下面的示例代码:



import com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;

import java.util.Arrays;
import java.util.HashMap;


public class Main {


    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf().setAppName("StreamingTest");

        conf.set("spark.shuffle.service.enabled", "true");
        conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
        conf.set("spark.streaming.backpressure.enabled", "true");
        conf.set("spark.streaming.concurrentJobs", "1");
        conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
        conf.set("spark.streaming.backpressure.pid.minRate", "1500");


        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5));



        JavaInputDStream<ConsumerRecord<Object, Object>> kafkaStream1 = createKafkaStream(ssc, "test-topic-1");

        kafkaStream1.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {
            System.out.println("Processing test-topic-1");
            try {
                Thread.sleep(2);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        })));

        kafkaStream1.foreachRDD(rdd -> {
            OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
            final OffsetRange[] beginOffsets = Arrays.stream(offsetRanges).map(o -> OffsetRange.create(o.topicPartition(), 0, o.fromOffset())).toArray(OffsetRange[]::new);
            rdd.foreachPartition(partition -> {
                OffsetRange o = beginOffsets[TaskContext.get().partitionId()];

            });
            ((CanCommitOffsets) kafkaStream1.inputDStream()).commitAsync(beginOffsets);
        });



        ssc.start();
        ssc.awaitTermination();
    }

    public static JavaInputDStream<ConsumerRecord<Object, Object>> createKafkaStream(JavaStreamingContext ssc, String topic) {
        HashMap<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<broker-ids>");
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic+"hrishi-testing-nfr-7");
        kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000);
        kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 80000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
        kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000);
        kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

        return KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(ImmutableList.of(topic), kafkaParams));
    }
}


推荐答案

我发现了发生的问题,因为我正在向驱动程序提交偏移量.这是代码.

I found the issue it is happening because I was committing offset from driver. This the code.

(((CanCommitOffsets)kafkaStream1.inputDStream()).commitAsync(offsetRanges);

   kafkaStream1.foreachRDD(rdd -> {
        OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
        rdd.foreachPartition(partition -> {
            partition.forEachRemaining(e -> {
                try {
                    System.out.println("hrishi mess" + e);
                    Thread.sleep(2);
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            });

        });
        ((CanCommitOffsets) kafkaStream1.inputDStream()).commitAsync(offsetRanges);
    });

接下来,我在Executors上启用了调试日志,发现KafkaRDD正在从Kafka进行轮询,在日志中清晰可见.

Next, I enabled debug log on Executors and found that KafkaRDD was polling from Kafka it is clearly visible in the log.

这篇关于Spark Streaming Direct Kafka使用者在执行者之间分布不均的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆