消费者阅读 __consumer_offsets 传递不可读的消息 [英] consumer reading __consumer_offsets delivers unreadable message

查看:24
本文介绍了消费者阅读 __consumer_offsets 传递不可读的消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从 __consumer_offsets 主题中消费,因为这似乎是检索有关消费者的 kafka 指标(如消息延迟等)的最简单方法.理想的方法是从 jmx 访问它,但想先尝试一下,以及回来似乎被加密或以不可读的形式.也尝试添加 stringDeserializer 属性.有没有人对如何纠正这个问题有任何建议?再次引用 this 是

的重复

解决方案

虽然可以直接从 __consumer_offsets 主题中读取,但这不是推荐或最简单的方法.

如果可以使用 Kafka 2.0,最好使用 AdminClient API 来描述组:

<小时>

如果您绝对想直接从 __consumer_offset 中读取,则需要对记录进行解码以使其可读.这可以使用 GroupMetadataManager 类来完成:

您链接的问题中的答案包含执行所有这些操作的框架代码.

另请注意,您不应将记录反序列化为字符串,而应将它们保留为原始字节,以便这些方法能够正确解码它们.

I am trying to consume from the __consumer_offsets topic as it seemed this may be the easiest way to retrieve kafka metrics about consumers like message lag etc. The ideal way is accessing it from jmx but wanted to try this first and the messages that come back seem to be encrypted or in unreadable form. Tried adding stringDeserializer property as well. Does anyone have any suggestions on how to correct this? Again the reference to this being a duplicate of

duplicate consumer_offset

is not helpful as it does not reference my issue which is to read message as a string in java. Updated the code as well to try a consumerRecord using kafka.client consumer.

consumerProps.put("exclude.internal.topics",  false);
consumerProps.put("group.id" , groupId);
consumerProps.put("zookeeper.connect", zooKeeper);


consumerProps.put("key.deserializer",
  "org.apache.kafka.common.serialization.StringDeserializer");  
consumerProps.put("value.deserializer",
  "org.apache.kafka.common.serialization.StringDeserializer");

ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
ConsumerConnector consumer = 
kafka.consumer.Consumer.createJavaConsumerConnector(
       consumerConfig);

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
   consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

for (KafkaStream stream : streams) {

    ConsumerIterator<byte[], byte[]> it = stream.iterator();

    //errorReporting("...CONSUMER-KAFKA CONNECTION SUCCESSFUL!");

    while (it.hasNext()) {
         try {

             String mesg = new String(it.next().message());
             System.out.println( mesg);

code changes:

try {       
    // errorReporting("CONSUMER-KAFKA CONNECTION INITIATING...");   
    Properties consumerProps = new Properties();
    consumerProps.put("exclude.internal.topics",  false);
    consumerProps.put("group.id" , "test");
    consumerProps.put("bootstrap.servers", servers);
    consumerProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");  
    consumerProps.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    //ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
    //ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
    //       consumerConfig);

    //Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    //topicCountMap.put(topic, new Integer(1));
    //Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    //List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    KafkaConsumer<String, String> kconsumer = new KafkaConsumer<>(consumerProps); 
    kconsumer.subscribe(Arrays.asList(topic)); 

    try {
        while (true) {
            ConsumerRecords<String, String> records = kconsumer.poll(10);

            for (ConsumerRecord<String, String> record : records)

                System.out.println(record.offset() + ": " + record.value());
        }
    } finally {
          kconsumer.close();
    }    

And the snapshot of what the message looks like below; at the bottom of the image:

解决方案

While it's possible to directly read from the __consumer_offsets topic, this is not the recommended or easiest method.

If you can use the Kafka 2.0, the best is to use the AdminClient APIs to describe groups:


In case, you absolutely want to read directly form __consumer_offset, you need to decode the record to make them human readable. This can be done using the GroupMetadataManager class:

This answer from the question you linked contains skeleton code to perform all that.

Also note that you should not deserialize the records as string but instead keep them as raw bytes for these methods to be able to decode them correctly.

这篇关于消费者阅读 __consumer_offsets 传递不可读的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆