分配给分区的循环消息分配不适用于没有密钥的消息 [英] Round Robin message assignment to partition not working for messages without key

查看:26
本文介绍了分配给分区的循环消息分配不适用于没有密钥的消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我创建了一个主题 first_topic 并向其发送消息.

for (int i = 0; i <10; i++) {//创建生产者记录ProducerRecord记录 =new ProducerRecord(first_topic", hello world" + i);//发送数据生产者.发送(记录,新回调(){public void onCompletion(RecordMetadata recordMetadata, Exception e) {//每次发送记录或发生异常时执行如果(e == null){//记录发送成功logger.info("收到新的元数据\n" +"话题:"+ recordMetadata.topic() + "\n";+"分区:"+ recordMetadata.partition() + "\n";+"偏移量:"+ recordMetadata.offset() + "\n";+"时间戳:"+ recordMetadata.timestamp());} 别的 {e.printStackTrace();logger.error("生成记录时出错", e);}}});}

但是所有消息都转到分区 #2.理想情况下,他们应该以循环方式进入所有 3 个.但不是.见下文.我做错了什么?

kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group my-third-application

消费者组my-third-application"没有活跃成员.

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID我的第三个应用程序 first_topic 0 0 0 0 - - -我的第三个应用程序 first_topic 1 0 0 0 - - -我的第三个应用程序 first_topic 2 10 10 0 - - -

解决方案

我也遇到过这个问题.原因不是循环分区器,而是生产者doSend"方法导致此问题.在doSend"中当累加器返回带有 abortForNewBatch 标志且值为真值的结果时,doSend 方法调用partition".方法,之前选择的分区保持未使用状态.如果主题只有两个分区,这个问题很危险,因为在这种情况下,只会使用一个分区.

doSend 方法:

<预><代码>...RecordAccumulator.RecordAppendResult 结果 = accumulator.append(tp, timestamp, serializedKey,serializedValue、headers、interceptCallback、remainingWaitMs、true、nowMs);如果(result.abortForNewBatch){int prevPartition = 分区;partitioner.onNewBatch(record.topic(), cluster, prevPartition);分区 = 分区(记录、序列化密钥、序列化值、集群);tp = new TopicPartition(record.topic(), partition);如果(log.isTraceEnabled()){log.trace(由于为主题{}分区{}创建新批次而重试追加.旧分区为{}",record.topic(),partition,prevPartition);}//生产者回调将确保同时调用回调"和拦截器回调interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);结果 = accumulator.append(tp, timestamp, serializedKey,serializedValue、headers、interceptCallback、remainingWaitMs、false、nowMs);}...

使用像这样的自定义循环分区器可以解决这个问题:

public class CustomRoundRobinPartitioner 实现 Partitioner {private final ConcurrentMaptopicCounterMap = new ConcurrentHashMap<>();private final ConcurrentMap未使用的分区 = 新的 ConcurrentHashMap<>();@覆盖public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {if (unusedPartition.containsKey(topic))返回未使用的Partition.remove(topic).get();返回 nextPartition(主题,集群);}public int nextPartition(字符串主题,集群集群){列表<分区信息>分区 = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();int nextValue = counterNextValue(topic);列表<分区信息>availablePartitions = cluster.availablePartitionsForTopic(topic);如果 (!availablePartitions.isEmpty()) {int part = Utils.toPositive(nextValue) % availablePartitions.size();返回 availablePartitions.get(part).partition();} 别的 {//没有可用的分区,给一个不可用的分区返回 Utils.toPositive(nextValue) % numPartitions;}}私人 int counterNextValue(字符串主题){AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {返回新的原子整数(0);});返回 counter.getAndIncrement();}@覆盖公共无效关闭(){}@覆盖公共无效配置(地图<字符串,?>配置){}@覆盖public void onNewBatch(String topic, Cluster cluster, int prevPartition) {未使用的Partition.put(topic, new AtomicInteger(prevPartition));}}

I have created a topic first_topic and produced messages to it.

for (int i = 0; i < 10; i++) {
    //create producer record
    ProducerRecord<String, String> record =
            new ProducerRecord<String, String>("first_topic", "hello world " + i);
    //send Data
    producer.send(record, new Callback() {
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            //executes every time a record is send or an exception occurs
            if (e == null) {
                //the record was successfully sent
                logger.info("Received new meta data \n" +
                        "Topic : " + recordMetadata.topic() + "\n" +
                        "Partition : " + recordMetadata.partition() + "\n" +
                        "OFfset : " + recordMetadata.offset() + "\n" +
                        "Timestamp : " + recordMetadata.timestamp());
            } else {
                e.printStackTrace();
                logger.error("Error while Producing record ", e);
            }
        }
    });
}

But all messages go to the partition #2. Ideally they should go to all 3 in round robin way. But no. See below. What am I doing wrong?

kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group my-third-application

Consumer group 'my-third-application' has no active members.

GROUP                TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
my-third-application first_topic     0          0               0               0               -               -               -
my-third-application first_topic     1          0               0               0               -               -               -
my-third-application first_topic     2          10              10              0               -               -               -

解决方案

I faced this problem too. the cause is not the round-robin partitioner but the producer "doSend" method causes this problem. in "doSend" method when the accumulator returns the result with abortForNewBatch flag with true value the doSend method calls "partition" method again and the previously selected partition remains unused. this problem is dangerous if the topic had only two partitions because in this case, only one partition will be used.

doSend Method:

...

    RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                        serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
    
                if (result.abortForNewBatch) {
                    int prevPartition = partition;
                    partitioner.onNewBatch(record.topic(), cluster, prevPartition);
                    partition = partition(record, serializedKey, serializedValue, cluster);
                    tp = new TopicPartition(record.topic(), partition);
                    if (log.isTraceEnabled()) {
                        log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
                    }
                    // producer callback will make sure to call both 'callback' and interceptor callback
                    interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
    
                    result = accumulator.append(tp, timestamp, serializedKey,
                        serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
                }
...

this problem will correct with using a custom round-robin partitioner like this:

public class CustomRoundRobinPartitioner implements Partitioner {

    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, AtomicInteger> unusedPartition = new ConcurrentHashMap<>();

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    if (unusedPartition.containsKey(topic))
        return unusedPartition.remove(topic).get();

    return nextPartition(topic, cluster);
}

public int nextPartition(String topic, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    int nextValue = counterNextValue(topic);
    List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
    if (!availablePartitions.isEmpty()) {
        int part = Utils.toPositive(nextValue) % availablePartitions.size();
        return availablePartitions.get(part).partition();
    } else {
        // no partitions are available, give a non-available partition
        return Utils.toPositive(nextValue) % numPartitions;
    }
}

private int counterNextValue(String topic) {
    AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
        return new AtomicInteger(0);
    });
    return counter.getAndIncrement();
}

@Override
public void close() {
}

@Override
public void configure(Map<String, ?> configs) {
}

@Override
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
    unusedPartition.put(topic, new AtomicInteger(prevPartition));
}
}

这篇关于分配给分区的循环消息分配不适用于没有密钥的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆