如何在Kafka中通过Spark产生消息均匀分布数据? [英] How to distribute data evenly in Kafka producing messages through Spark?
问题描述
我有一个将数据写入 Kafka 的流式作业,我注意到其中一个 Kafka 分区(#3)比其他分区需要更多的数据.
I have a streaming job that writes data into Kafka and I've noticed one of Kafka partitions (#3) takes more data then others.
+-----------------------------------------------------+
| partition | messages | earlist offset | next offset|
+-----------------------------------------------------+
|1 | 166522754 | 5861603324 | 6028126078 |
|2 | 152251127 | 6010226633 | 6162477760 |
|3 | 382935293 | 6332944925 | 6715880218 |
|4 | 188126274 | 6171311709 | 6359437983 |
|5 | 188270700 | 6100140089 | 6288410789 |
+-----------------------------------------------------+
我找到了一个选项 - 使用 Kafka 分区数 (5) 重新分区输出数据集.
I found one option - to repartition output dataset using number of Kafka partitions (5).
还有其他方法可以均匀分布数据吗?
Is there any other way to distribute data evenly?
推荐答案
数据在 Kafka 中的分区方式不依赖于数据在 Spark 及其 Dataset 中的分区方式.从 Kafka 的角度来看,它取决于消息的键,或者您在写入 Kafka 时应用自定义 Partitioner 类.
How data ist partitioned in Kafka does not depend on how the data is partitioned in Spark and its Dataset. From Kafka perspective it depends on the keys of the message or you apply a custom Partitioner class when writing to Kafka.
Kafka 的数据分区有以下几种场景:
There are the following scenarios how data is partitioned in Kafka:
如果 Kafka 消息中没有定义 key,Kafka 将以循环方式将消息分发到所有分区.
If no key is definied in the Kafka messages, Kafka will distribute the messages in a round-robin fashion accross all partitions.
如果提供消息键,默认情况下,Kafka会根据
If you provide a message key, by default, Kafka will decide on the partition based on
hash(key) % numer_of_partitions
提供自定义分区器
如果您想完全控制 Kafka 如何在主题的分区中存储消息,您可以编写自己的 Partitioner 类并将其设置为您的生产者配置中的 partitioner.class
.
这是客户分区程序类可能喜欢的示例
Here is an example of how a customer partitioner class could like
public class MyPartitioner implements Partitioner {
public void configure(Map<String, ?> configs) {}
public void close() {}
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if ((keyBytes == null) || (!(key instanceOf String)))
throw new InvalidRecordException("Record did not have a string Key");
if (((String) key).equals("myKey"))
return 0; // This key will always go to Partition 0
// Other records will go to the rest of the Partitions using a hashing function
return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1)) + 1;
}
}
这篇关于如何在Kafka中通过Spark产生消息均匀分布数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!