Kafka Streams RoundRobinPartitioner [英] Kafka Streams RoundRobinPartitioner
问题描述
我写了一个使用kafka 2.4 kafka客户端版本和kafka 2.2服务器版本的kafka流代码.我的主题有50个分区&内部主题.
I wrote a kafka streams code that uses kafka 2.4 kafka client version and kafka 2.2 server version. I have 50 partition on my topic & internal topic.
我的kafka流代码具有selectKey()DSL操作,并且使用相同的KEY有200万条记录.在流配置中,我已经完成
My kafka stream code has selectKey() DSL operation and I have 2 million of record using same KEY. In the stream configuration, I have done
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);
这样我就可以使用完全相同的键来使用不同的分区.如果我没有按预期使用Round Robin,那么我所有的消息都会进入同一个分区.
So that I am able to use different partitions with the exactly same key. If I dont use Round Robin as expected all my messages go to same partition.
直到现在一切都还好,但是我意识到了;当我使用RoundRobinPartitioner类时,我的消息就像大约40个分区. 10个分区处于空闲状态.我想知道我在想什么吗?应该使用其中的50个大约200万条记录吗?
Everything is ok untill now but I realized that; when I use RoundRobinPartitioner class my messages go like ~40 partitions. 10 partition is in the idle state. I wonder what am I missing ? It should use 50 of them about 2 million of records right?
final KStream<String, IdListExportMessage> exportedDeviceIdsStream =
builder.stream("deviceIds");
// k: appId::deviceId, v: device
final KTable<String, Device> deviceTable = builder.table(
"device",
Consumed.with(Serdes.String(), deviceSerde)
);
// Some DSL operations
.join(
deviceTable,
(exportedDevice, device) -> {
exportedDevice.setDevice(device);
return exportedDevice;
},
Joined.with(Serdes.String(), exportedDeviceSerde, deviceSerde)
)
.selectKey((deviceId, exportedDevice) -> exportedDevice.getDevice().getId())
.to("bulk_consumer");
还有
props.put(StreamsConfig.STATE_DIR_CONFIG, /tmp/kafka-streams);
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
props.put("num.stream.threads", 10);
props.put("application.id", applicationId);
RoundRobinPartitioner.java
RoundRobinPartitioner.java
public class RoundRobinPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();
public RoundRobinPartitioner() {
}
public void configure(Map<String, ?> configs) {
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = this.nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return ((PartitionInfo)availablePartitions.get(part)).partition();
} else {
return Utils.toPositive(nextValue) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> {
return new AtomicInteger(0);
});
return counter.getAndIncrement();
}
public void close() {
}
}
推荐答案
您不能使用ProducerConfig.PARTITIONER_CLASS_CONFIG
配置更改分区-仅适用于普通生产者.
You cannot change the partitioning using ProducerConfig.PARTITIONER_CLASS_CONFIG
configuration -- this only works for the plain producer.
在Kafka Streams中,您需要实现接口StreamsPartitioner
,并将实现传递给相应的运算符,例如to("topic", Produced.streamPartitioner(new MyPartitioner())
.
In Kafka Streams, you need to implement the interface StreamsPartitioner
and pass your implementation into corresponding operators, e.g., to("topic", Produced.streamPartitioner(new MyPartitioner())
.
这篇关于Kafka Streams RoundRobinPartitioner的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!