Kafka分区键无法正常工作 [英] Kafka partition key not working properly‏

查看:42
本文介绍了Kafka分区键无法正常工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在为如何正确使用分区键机制而苦苦挣扎.我的逻辑是将分区号设置为3,然后创建三个分区键,分别为"0","1","2",然后使用分区键创建三个KeyedMessage,例如

I'm struggling with how to use the partition key mechanism properly. My logic is set the partition number as 3, then create three partition keys as "0", "1", "2", then use the partition keys to create three KeyedMessage such as

  • KeyedMessage(主题,"0",消息)
  • KeyedMessage(主题,"1",消息)
  • KeyedMessage(主题,"2",消息)

此后,创建一个生产者实例以发送所有KeyedMessage.

After this, creating a producer instance to send out all the KeyedMessage.

我希望每个KeyedMessage应该根据不同的分区键进入不同的分区,这意味着

I expecting each KeyedMessage should enter to different partitions according to the different partition keys, which means

  • KeyedMessage(主题,"0",消息)转到分区0
  • KeyedMessage(主题,"1",消息)转到分区1
  • KeyedMessage(主题,"2",消息)转到分区2

我正在使用Kafka-web-console监视主题状态,但是结果与我期望的不一样.KeyedMessage仍然会随机进入分区,有时即使两个KeyedMessage具有不同的分区键,它们也会进入相同的分区.

I'm using Kafka-web-console to watch the topic status, but the result is not like what I'm expecting. KeyedMessage still go to partitions randomly, some times two KeyedMessage will enter the same partition even they have different partition keys.

为了使我的问题更清楚,我想发布一些当前的Scala代码,并且我正在使用Kafka 0.8.2-beta和Scala 2.10.4 .

To make my question more clear, I would like to post some Scala codes currently I have, and I'm using Kafka 0.8.2-beta, and Scala 2.10.4.

这里是生产者代码,我没有使用自定义partitioner.class :

  val props = new Properties()

  val codec = if(compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec

  props.put("compression.codec", codec.toString)
  props.put("producer.type", if(synchronously) "sync" else "async")
  props.put("metadata.broker.list", brokerList)
  props.put("batch.num.messages", batchSize.toString)
  props.put("message.send.max.retries", messageSendMaxRetries.toString)
  props.put("request.required.acks",requestRequiredAcks.toString)
  props.put("client.id",clientId.toString)

  val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))

  def kafkaMesssage(message: Array[Byte], partition: Array[Byte]): KeyedMessage[AnyRef, AnyRef] = {
     if (partition == null) {
       new KeyedMessage(topic,message)
     } else {
       new KeyedMessage(topic,partition,message)
     }
  }

  def send(message: String, partition: String = null): Unit = send(message.getBytes("UTF8"), if (partition == null) null else partition.getBytes("UTF8"))

  def send(message: Array[Byte], partition: Array[Byte]): Unit = {
    try {
      producer.send(kafkaMesssage(message, partition))
    } catch {
      case e: Exception =>
        e.printStackTrace
        System.exit(1)
    }       
  }

这是我使用生产者的方式,创建生产者实例,然后使用该实例发送三个消息.当前,我将分区键创建为Integer,然后将其转换为字节数组:

And here is how I use the producer, create a producer instance, then use this instance to send three message. Currently I create the partition key as Integer, then convert it to Byte Arrays:

  val testMessage = UUID.randomUUID().toString
  val testTopic = "sample1"
  val groupId_1 = "testGroup"

  print("starting sample broker testing")
  val producer = new KafkaProducer(testTopic, "localhost:9092")

  val numList = List(0,1,2);
  for (a <- numList) {
    // Create a partition key as Byte Array
    var key = java.nio.ByteBuffer.allocate(4).putInt(a).array()
    //Here I give a Array[Byte] key
    //so the second "send" function of producer will be called
    producer.send(testMessage.getBytes("UTF8"), key)
  }

不确定我的逻辑是否正确或者我对分区键机制的理解不正确.任何人都可以提供一些示例代码,或者解释会很棒!

Not sure whether my logic is incorrect or I didn't understand the partition key mechanism correctly. Anyone could provides some sample code or explanation would be great!

推荐答案

有相同的问题-只需切换到ByteArrayParitioner:

Had the same issue - just switch to the ByteArrayParitioner:

props.put("partitioner.class", "kafka.producer.ByteArrayPartitioner")

这篇关于Kafka分区键无法正常工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆