如何在Kafka中通过Spark产生消息均匀分布数据? [英] How to distribute data evenly in Kafka producing messages through Spark?

查看:19
本文介绍了如何在Kafka中通过Spark产生消息均匀分布数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个将数据写入 Kafka 的流式作业,我注意到其中一个 Kafka 分区(#3)比其他分区需要更多的数据.

I have a streaming job that writes data into Kafka and I've noticed one of Kafka partitions (#3) takes more data then others.

+-----------------------------------------------------+
| partition | messages  | earlist offset | next offset|
+-----------------------------------------------------+
|1          | 166522754 | 5861603324     | 6028126078 |
|2          | 152251127 | 6010226633     | 6162477760 |
|3          | 382935293 | 6332944925     | 6715880218 |
|4          | 188126274 | 6171311709     | 6359437983 |
|5          | 188270700 | 6100140089     | 6288410789 |
+-----------------------------------------------------+

我找到了一个选项 - 使用 Kafka 分区数 (5) 重新分区输出数据集.

I found one option - to repartition output dataset using number of Kafka partitions (5).

还有其他方法可以均匀分布数据吗?

Is there any other way to distribute data evenly?

推荐答案

数据在 Kafka 中的分区方式不依赖于数据在 Spark 及其 Dataset 中的分区方式.从 Kafka 的角度来看,它取决于消息的键,或者您在写入 Kafka 时应用自定义 Partitioner 类.

How data ist partitioned in Kafka does not depend on how the data is partitioned in Spark and its Dataset. From Kafka perspective it depends on the keys of the message or you apply a custom Partitioner class when writing to Kafka.

Kafka 的数据分区有以下几种场景:

There are the following scenarios how data is partitioned in Kafka:

如果 Kafka 消息中没有定义 key,Kafka 将以循环方式将消息分发到所有分区.

If no key is definied in the Kafka messages, Kafka will distribute the messages in a round-robin fashion accross all partitions.

如果提供消息键,默认情况下,Kafka会根据

If you provide a message key, by default, Kafka will decide on the partition based on

hash(key) % numer_of_partitions

提供自定义分区器

如果您想完全控制 Kafka 如何在主题的分区中存储消息,您可以编写自己的 Partitioner 类并将其设置为您的生产者配置中的 partitioner.class.

这是客户分区程序类可能喜欢的示例

Here is an example of how a customer partitioner class could like

public class MyPartitioner implements Partitioner {
  public void configure(Map<String, ?> configs) {}
  public void close() {}

  public int partition(String topic, Object key, byte[] keyBytes,
                       Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();

    if ((keyBytes == null) || (!(key instanceOf String)))
      throw new InvalidRecordException("Record did not have a string Key");

    if (((String) key).equals("myKey"))
       return 0; // This key will always go to Partition 0

    // Other records will go to the rest of the Partitions using a hashing function
    return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1)) + 1;
  }
}

这篇关于如何在Kafka中通过Spark产生消息均匀分布数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆