如何在Kafka中均匀分布数据以通过Spark生成消息? [英] How to distribute data evenly in Kafka producing messages through Spark?

查看:61
本文介绍了如何在Kafka中均匀分布数据以通过Spark生成消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个将数据写入Kafka的流作业,我注意到其中一个Kafka分区(#3)会比其他分区吸收更多的数据.

I have a streaming job that writes data into Kafka and I've noticed one of Kafka partitions (#3) takes more data then others.

+-----------------------------------------------------+
| partition | messages  | earlist offset | next offset|
+-----------------------------------------------------+
|1          | 166522754 | 5861603324     | 6028126078 |
|2          | 152251127 | 6010226633     | 6162477760 |
|3          | 382935293 | 6332944925     | 6715880218 |
|4          | 188126274 | 6171311709     | 6359437983 |
|5          | 188270700 | 6100140089     | 6288410789 |
+-----------------------------------------------------+

我找到了一个选择-使用多个Kafka分区(5)重新划分输出数据集.

I found one option - to repartition output dataset using number of Kafka partitions (5).

还有其他方法可以平均分配数据吗?

Is there any other way to distribute data evenly?

推荐答案

在Kafka中如何对数据进行分区并不取决于在Spark及其数据集中如何对数据进行分区.从Kafka的角度来看,它取决于消息的键,或者在写入Kafka时应用自定义的Partitioner类.

How data ist partitioned in Kafka does not depend on how the data is partitioned in Spark and its Dataset. From Kafka perspective it depends on the keys of the message or you apply a custom Partitioner class when writing to Kafka.

在以下情况下,如何在Kafka中对数据进行分区:

There are the following scenarios how data is partitioned in Kafka:

如果在Kafka消息中没有定义密钥,Kafka将以循环方式在所有分区上分发消息.

If no key is definied in the Kafka messages, Kafka will distribute the messages in a round-robin fashion accross all partitions.

如果您提供消息密钥,默认情况下,Kafka将基于

If you provide a message key, by default, Kafka will decide on the partition based on

hash(key) % numer_of_partitions

提供自定义分区程序

如果要完全控制Kafka如何在主题分区中存储消息,可以编写自己的Partitioner类,并将其设置为Producer配置中的 partitioner.class .

这是一个示例客户分区程序类的例子

Here is an example of how a customer partitioner class could like

public class MyPartitioner implements Partitioner {
  public void configure(Map<String, ?> configs) {}
  public void close() {}

  public int partition(String topic, Object key, byte[] keyBytes,
                       Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();

    if ((keyBytes == null) || (!(key instanceOf String)))
      throw new InvalidRecordException("Record did not have a string Key");

    if (((String) key).equals("myKey"))
       return 0; // This key will always go to Partition 0

    // Other records will go to the rest of the Partitions using a hashing function
    return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1)) + 1;
  }
}

这篇关于如何在Kafka中均匀分布数据以通过Spark生成消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆