Kafka 分区键无法正常工作 [英] Kafka partition key not working properly‏

查看:20
本文介绍了Kafka 分区键无法正常工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在为如何正确使用分区键机制而苦苦挣扎.我的逻辑是设置分区号为3,然后创建三个分区键为0"、1"、2",然后使用分区键创建三个KeyedMessage如

I'm struggling with how to use the partition key mechanism properly. My logic is set the partition number as 3, then create three partition keys as "0", "1", "2", then use the partition keys to create three KeyedMessage such as

  • KeyedMessage(topic, "0", message)
  • KeyedMessage(topic, "1", message)
  • KeyedMessage(topic, "2", message)

在此之后,创建一个生产者实例来发送所有的 KeyedMessage.

After this, creating a producer instance to send out all the KeyedMessage.

我期望每个KeyedMessage应该根据不同的分区键进入不同的分区,这意味着

I expecting each KeyedMessage should enter to different partitions according to the different partition keys, which means

  • KeyedMessage(topic, "0", message) 转到分区 0
  • KeyedMessage(topic, "1", message) 转到分区 1
  • KeyedMessage(topic, "2", message) 转到分区 2

我正在使用 Kafka-web-console 来查看主题状态,但结果并不像我期望的那样.KeyedMessage 仍然会随机进入分区,有时两个 KeyedMessage 会进入同一个分区,即使它们的分区键不同.

I'm using Kafka-web-console to watch the topic status, but the result is not like what I'm expecting. KeyedMessage still go to partitions randomly, some times two KeyedMessage will enter the same partition even they have different partition keys.

为了让我的问题更清楚,我想发布一些我目前拥有的 Scala 代码,我使用的是 Kafka 0.8.2-beta 和 Scala 2.10.4.

To make my question more clear, I would like to post some Scala codes currently I have, and I'm using Kafka 0.8.2-beta, and Scala 2.10.4.

这是生产者代码,我没有使用自定义 partitioner.class :

  val props = new Properties()

  val codec = if(compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec

  props.put("compression.codec", codec.toString)
  props.put("producer.type", if(synchronously) "sync" else "async")
  props.put("metadata.broker.list", brokerList)
  props.put("batch.num.messages", batchSize.toString)
  props.put("message.send.max.retries", messageSendMaxRetries.toString)
  props.put("request.required.acks",requestRequiredAcks.toString)
  props.put("client.id",clientId.toString)

  val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))

  def kafkaMesssage(message: Array[Byte], partition: Array[Byte]): KeyedMessage[AnyRef, AnyRef] = {
     if (partition == null) {
       new KeyedMessage(topic,message)
     } else {
       new KeyedMessage(topic,partition,message)
     }
  }

  def send(message: String, partition: String = null): Unit = send(message.getBytes("UTF8"), if (partition == null) null else partition.getBytes("UTF8"))

  def send(message: Array[Byte], partition: Array[Byte]): Unit = {
    try {
      producer.send(kafkaMesssage(message, partition))
    } catch {
      case e: Exception =>
        e.printStackTrace
        System.exit(1)
    }       
  }

这里是我如何使用生产者,创建一个生产者实例,然后使用这个实例发送三个消息.目前我将分区键创建为整数,然后将其转换为字节数组:

And here is how I use the producer, create a producer instance, then use this instance to send three message. Currently I create the partition key as Integer, then convert it to Byte Arrays:

  val testMessage = UUID.randomUUID().toString
  val testTopic = "sample1"
  val groupId_1 = "testGroup"

  print("starting sample broker testing")
  val producer = new KafkaProducer(testTopic, "localhost:9092")

  val numList = List(0,1,2);
  for (a <- numList) {
    // Create a partition key as Byte Array
    var key = java.nio.ByteBuffer.allocate(4).putInt(a).array()
    //Here I give a Array[Byte] key
    //so the second "send" function of producer will be called
    producer.send(testMessage.getBytes("UTF8"), key)
  }

不确定是我的逻辑不正确还是我没有正确理解分区键机制.任何人都可以提供一些示例代码或解释会很棒!

Not sure whether my logic is incorrect or I didn't understand the partition key mechanism correctly. Anyone could provides some sample code or explanation would be great!

推荐答案

有同样的问题 - 只需切换到 ByteArrayParitioner:

Had the same issue - just switch to the ByteArrayParitioner:

props.put("partitioner.class", "kafka.producer.ByteArrayPartitioner")

这篇关于Kafka 分区键无法正常工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆