普通生产者的自定义分区|卡夫卡流 [英] Custom Partitioner for Plain Producer | Kafka Streams

查看:21
本文介绍了普通生产者的自定义分区|卡夫卡流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 kafka 流应用程序,它具有

I have an kafka streams application which has

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);

在 kafka 2.4 版本中即使使用相同的密钥也可以将消息分发到不同分区的类

which is a class for distributing messages to different partitions even using same key in kafka 2.4 version

RoundRobinPartitioner 有这个实现:

RoundRobinPartitioner has this implementation:

public class RoundRobinPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();

    public RoundRobinPartitioner() {
    }

    public void configure(Map<String, ?> configs) {
    }

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        int nextValue = this.nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (!availablePartitions.isEmpty()) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return ((PartitionInfo)availablePartitions.get(part)).partition();
        } else {
            return Utils.toPositive(nextValue) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> {
            return new AtomicInteger(0);
        });
        return counter.getAndIncrement();
    }

    public void close() {
    }
}

和我的 Partitioner 由完全相同的代码但不同的分区方法实现组成,我的代码块是:

and my Partitioner consists of exactly same code but different partition method implementation and my code block is:

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

        int numPartitions = partitions.size();

        int nextValue = nextValue(topic);

        return Utils.toPositive(nextValue) % numPartitions;

    }

当我像这样配置时,消息被分发到不同的分区,在两种实现中,但从不使用某些分区.

When I configure like that messages are distributed to different partitions, in both implementation, but never use the some partitions.

我有 50 个分区,分区 14 和 34 从未收到消息.我的分区不是不可用.它们是可用的.当我将返回分区方法更改为 14 或 34 时,我的所有消息都会转到该分区.可能是什么问题呢 ?两种实现都没有按预期工作.

I have 50 partitions and partition 14 and 34 never received a message. My partitions are not unavaliable. They are avaliable. When I change my return partition method to 14 or 34, all my messages go to that partition. What could be the problem ? Both implementations are not working as expected.

编辑 1: 我已经尝试过使用普通生产者的 RoundRobinPartitioner.结果是一样的.生产者不能在分区之间平均生产消息,有些分区从未使用过.可能是什么原因 ?这不像是缺少配置.

Edit 1: I have tried RoundRobinPartitioner with the plain producer. Result is the same. Producer can not produce messages equally amoung partitions, some partitions are never used. What could be the reason ? It is not like a missing configuration.

编辑 2: 我已经调试了 RoundRobinPartitioner 并在返回时放置了一个断点.当我只产生 1 条消息时,Producer 产生两次消息.第一次尝试总是不成功,并且该消息不会进入任何分区.当我在调试时点击continue,ConcurrentMap 的索引增加1.生产者的第二次尝试成功.

Edit 2: I have debug RoundRobinPartitioner and put a breakpoint at return. When I produce just 1 message, Producer produce message twice. First attempt is always unsuccessful and that message does not go any partition. When I hit continue at debugging index of the ConcurrentMap increases by 1. The second attempt of the producer is successful.

partition() 方法在我找不到的地方被调用.

partition() method is invoked something where I could not find yet.

编辑 3:这可能与我没有覆盖的 onNewBatch 方法有关吗?

Edit 3: Could this be related with onNewBatch method which I did not override ?

编辑 4:此实现适用于 kafka 客户端 2.2,但不适用于 2.4.分区接口没有 onNewBatch 方法.当 key 为 null 2.2 vs 2.4 时,DefaultPartitioner 实现发生了变化.是否与棒分区有关?

Edit 4: This implementatin works for kafka client 2.2 but not for 2.4. Partition interface does not have onNewBatch method. DefaultPartitioner implementation is changed when key is null 2.2 vs 2.4. Can it be related with stick partitions ?

推荐答案

在 kafka 2.4 客户端版本中使用 UniformStickyPartitioner.class.RoundRobinPartitioner.class 适用于 kafka 2.2 或更低版本.2.4版本

Use UniformStickyPartitioner.class in kafka 2.4 client version. RoundRobinPartitioner.class works for kafka 2.2 or lower versions. In 2.4 version

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, UniformStickyPartitioner.class);

应该使用.我认为这与新的 StickPartitioner 相关.

should be used. I think this is related with new StickPartitioner.

这篇关于普通生产者的自定义分区|卡夫卡流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆