消费者阅读__consumer_offsets传递不可读的消息 [英] consumer reading __consumer_offsets delivers unreadable message

查看:29
本文介绍了消费者阅读__consumer_offsets传递不可读的消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从__consumer_offsets主题进行消费,因为这似乎是检索有关诸如消息滞后之类的消费者的kafka指标的最简单方法.理想的方法是从jmx访问它,但想首先尝试使用此方法以及回来似乎是加密的或不可读的形式.尝试添加stringDeserializer属性.有人对如何更正此有任何建议吗?同样,对此的引用是

的副本

解决方案

虽然可以直接从 __ consumer_offsets 主题中进行读取,但这不是推荐或最简单的方法.

如果可以使用Kafka 2.0,最好的方法是使用AdminClient API来描述组:

  • 答案包含执行所有操作的框架代码.

    还请注意,您不应将记录反序列化为字符串,而应将其保留为原始字节,以使这些方法能够正确地对其进行解码.

    I am trying to consume from the __consumer_offsets topic as it seemed this may be the easiest way to retrieve kafka metrics about consumers like message lag etc. The ideal way is accessing it from jmx but wanted to try this first and the messages that come back seem to be encrypted or in unreadable form. Tried adding stringDeserializer property as well. Does anyone have any suggestions on how to correct this? Again the reference to this being a duplicate of

    duplicate consumer_offset

    is not helpful as it does not reference my issue which is to read message as a string in java. Updated the code as well to try a consumerRecord using kafka.client consumer.

    consumerProps.put("exclude.internal.topics",  false);
    consumerProps.put("group.id" , groupId);
    consumerProps.put("zookeeper.connect", zooKeeper);
    
    
    consumerProps.put("key.deserializer",
      "org.apache.kafka.common.serialization.StringDeserializer");  
    consumerProps.put("value.deserializer",
      "org.apache.kafka.common.serialization.StringDeserializer");
    
    ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
    ConsumerConnector consumer = 
    kafka.consumer.Consumer.createJavaConsumerConnector(
           consumerConfig);
    
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(1));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
       consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
    
    for (KafkaStream stream : streams) {
    
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
    
        //errorReporting("...CONSUMER-KAFKA CONNECTION SUCCESSFUL!");
    
        while (it.hasNext()) {
             try {
    
                 String mesg = new String(it.next().message());
                 System.out.println( mesg);
    

    code changes:

    try {       
        // errorReporting("CONSUMER-KAFKA CONNECTION INITIATING...");   
        Properties consumerProps = new Properties();
        consumerProps.put("exclude.internal.topics",  false);
        consumerProps.put("group.id" , "test");
        consumerProps.put("bootstrap.servers", servers);
        consumerProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");  
        consumerProps.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    
        //ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
        //ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
        //       consumerConfig);
    
        //Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        //topicCountMap.put(topic, new Integer(1));
        //Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        //List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
    
        KafkaConsumer<String, String> kconsumer = new KafkaConsumer<>(consumerProps); 
        kconsumer.subscribe(Arrays.asList(topic)); 
    
        try {
            while (true) {
                ConsumerRecords<String, String> records = kconsumer.poll(10);
    
                for (ConsumerRecord<String, String> record : records)
    
                    System.out.println(record.offset() + ": " + record.value());
            }
        } finally {
              kconsumer.close();
        }    
    

    And the snapshot of what the message looks like below; at the bottom of the image:

    解决方案

    While it's possible to directly read from the __consumer_offsets topic, this is not the recommended or easiest method.

    If you can use the Kafka 2.0, the best is to use the AdminClient APIs to describe groups:


    In case, you absolutely want to read directly form __consumer_offset, you need to decode the record to make them human readable. This can be done using the GroupMetadataManager class:

    This answer from the question you linked contains skeleton code to perform all that.

    Also note that you should not deserialize the records as string but instead keep them as raw bytes for these methods to be able to decode them correctly.

    这篇关于消费者阅读__consumer_offsets传递不可读的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆