消费者阅读 __consumer_offsets 传递不可读的消息 [英] consumer reading __consumer_offsets delivers unreadable message
问题描述
我正在尝试从 __consumer_offsets 主题中消费,因为这似乎是检索有关消费者的 kafka 指标(如消息延迟等)的最简单方法.理想的方法是从 jmx 访问它,但想先尝试一下,以及回来似乎被加密或以不可读的形式.也尝试添加 stringDeserializer 属性.有没有人对如何纠正这个问题有任何建议?再次引用 this 是
的重复虽然可以直接从 __consumer_offsets
主题中读取,但这不是推荐或最简单的方法.
如果可以使用 Kafka 2.0,最好使用 AdminClient API 来描述组:
- listConsumerGroupOffsets():查找特定组的所有偏移量
- describeConsumerGroups():查找有关组成员的详细信息
如果您绝对想直接从 __consumer_offset
中读取,则需要对记录进行解码以使其可读.这可以使用 GroupMetadataManager
类来完成:
GroupMetadataManager.readMessageKey() 可用于解码消息密钥并检索此条目引用的主题分区.这可以返回两种类型的对象,对于消费者位置,您只对
OffsetKey
对象感兴趣.GroupMetadataManager.readOffsetMessageValue() 可用于解码消息值(对于
OffsetKey
的键)并找到偏移信息.
您链接的问题中的答案包含执行所有这些操作的框架代码.
另请注意,您不应将记录反序列化为字符串,而应将它们保留为原始字节,以便这些方法能够正确解码它们.
I am trying to consume from the __consumer_offsets topic as it seemed this may be the easiest way to retrieve kafka metrics about consumers like message lag etc. The ideal way is accessing it from jmx but wanted to try this first and the messages that come back seem to be encrypted or in unreadable form. Tried adding stringDeserializer property as well. Does anyone have any suggestions on how to correct this? Again the reference to this being a duplicate of
is not helpful as it does not reference my issue which is to read message as a string in java. Updated the code as well to try a consumerRecord using kafka.client consumer.
consumerProps.put("exclude.internal.topics", false);
consumerProps.put("group.id" , groupId);
consumerProps.put("zookeeper.connect", zooKeeper);
consumerProps.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
ConsumerConnector consumer =
kafka.consumer.Consumer.createJavaConsumerConnector(
consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
for (KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
//errorReporting("...CONSUMER-KAFKA CONNECTION SUCCESSFUL!");
while (it.hasNext()) {
try {
String mesg = new String(it.next().message());
System.out.println( mesg);
code changes:
try {
// errorReporting("CONSUMER-KAFKA CONNECTION INITIATING...");
Properties consumerProps = new Properties();
consumerProps.put("exclude.internal.topics", false);
consumerProps.put("group.id" , "test");
consumerProps.put("bootstrap.servers", servers);
consumerProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
//ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
//ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
// consumerConfig);
//Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
//topicCountMap.put(topic, new Integer(1));
//Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
//List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
KafkaConsumer<String, String> kconsumer = new KafkaConsumer<>(consumerProps);
kconsumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<String, String> records = kconsumer.poll(10);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
}
} finally {
kconsumer.close();
}
And the snapshot of what the message looks like below; at the bottom of the image:
While it's possible to directly read from the __consumer_offsets
topic, this is not the recommended or easiest method.
If you can use the Kafka 2.0, the best is to use the AdminClient APIs to describe groups:
- listConsumerGroupOffsets(): to find all offsets for a specific group
- describeConsumerGroups(): to find details about members of a group
In case, you absolutely want to read directly form __consumer_offset
, you need to decode the record to make them human readable. This can be done using the GroupMetadataManager
class:
GroupMetadataManager.readMessageKey() can be used to decode the message key and retrieve the topic-partition this entry refers to. This can return 2 types of objects, for consumer positions, you are only interested in
OffsetKey
objects.GroupMetadataManager.readOffsetMessageValue() can be used to decode message values (for keys that were
OffsetKey
) and find the offsets information.
This answer from the question you linked contains skeleton code to perform all that.
Also note that you should not deserialize the records as string but instead keep them as raw bytes for these methods to be able to decode them correctly.
这篇关于消费者阅读 __consumer_offsets 传递不可读的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!