Kafka Streams RoundRobinPartitioner [英] Kafka Streams RoundRobinPartitioner

查看:27
本文介绍了Kafka Streams RoundRobinPartitioner的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我写了一个使用 kafka 2.4 kafka 客户端版本和 kafka 2.2 服务器版本的 kafka 流代码.我的主题有 50 个分区 &内部话题.

I wrote a kafka streams code that uses kafka 2.4 kafka client version and kafka 2.2 server version. I have 50 partition on my topic & internal topic.

我的 kafka 流代码有 selectKey() DSL 操作,我有 200 万条记录使用相同的 KEY.在流配置中,我已经完成

My kafka stream code has selectKey() DSL operation and I have 2 million of record using same KEY. In the stream configuration, I have done

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);

这样我就可以使用具有完全相同密钥的不同分区.如果我没有按预期使用循环,我的所有消息都会转到同一个分区.

So that I am able to use different partitions with the exactly same key. If I dont use Round Robin as expected all my messages go to same partition.

到目前为止一切都很好,但我意识到;当我使用 RoundRobinPartitioner 类时,我的消息大约有 40 个分区.10 个分区处于空闲状态.我想知道我错过了什么?它应该使用其中的 50 个大约 200 万条记录对吗?

Everything is ok untill now but I realized that; when I use RoundRobinPartitioner class my messages go like ~40 partitions. 10 partition is in the idle state. I wonder what am I missing ? It should use 50 of them about 2 million of records right?

      final KStream<String, IdListExportMessage> exportedDeviceIdsStream =
            builder.stream("deviceIds");

        // k: appId::deviceId, v: device
        final KTable<String, Device> deviceTable = builder.table(
            "device",
            Consumed.with(Serdes.String(), deviceSerde)
        );
            // Some DSL operations
            .join(
                deviceTable,
                (exportedDevice, device) -> {
                    exportedDevice.setDevice(device);

                    return exportedDevice;
                },
                Joined.with(Serdes.String(), exportedDeviceSerde, deviceSerde)
            )
            .selectKey((deviceId, exportedDevice) -> exportedDevice.getDevice().getId())
            .to("bulk_consumer");

   props.put(StreamsConfig.STATE_DIR_CONFIG, /tmp/kafka-streams);
   props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
   props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
   props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
   props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
   props.put("num.stream.threads", 10);
   props.put("application.id", applicationId);

RoundRobinPartitioner.java

RoundRobinPartitioner.java

public class RoundRobinPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();

    public RoundRobinPartitioner() {
    }

    public void configure(Map<String, ?> configs) {
    }

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        int nextValue = this.nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (!availablePartitions.isEmpty()) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return ((PartitionInfo)availablePartitions.get(part)).partition();
        } else {
            return Utils.toPositive(nextValue) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> {
            return new AtomicInteger(0);
        });
        return counter.getAndIncrement();
    }

    public void close() {
    }
}

推荐答案

您不能使用 ProducerConfig.PARTITIONER_CLASS_CONFIG 配置更改分区 -- 这仅适用于普通生产者.

You cannot change the partitioning using ProducerConfig.PARTITIONER_CLASS_CONFIG configuration -- this only works for the plain producer.

在Kafka Streams中,您需要实现接口StreamsPartitioner并将您的实现传递给相应的操作符,例如,to("topic", Produced.streamPartitioner(new MyPartitioner()).

In Kafka Streams, you need to implement the interface StreamsPartitioner and pass your implementation into corresponding operators, e.g., to("topic", Produced.streamPartitioner(new MyPartitioner()).

这篇关于Kafka Streams RoundRobinPartitioner的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆