如何让消费者在 Kafka 0.8 API 中工作 [英] How to get consumers to work in Kafka 0.8 API
问题描述
我即将编写一个用于发布和使用 kafka 消息的原型.我们确实已经设置了 Cloudera 基础设施(动物园管理员、代理等),而且我已经成功地使用了 Kafka 命令行工具来生成和使用消息.
I am about to write a prototype for publishing and consuming kafka messages. We do have a Cloudera infrastructure set up already (zookeepers, brokers, etc.), and I have played with the Kafka command-line tools successfully already, to produce and consume messages.
我正在使用 [org.apache.kafka/kafka_2.10 "0.8.2.1"]
作为依赖项,并且已经能够使用客户端 API 来设置 KafkaProducer
发布带有纯字符串内容的消息,并且可以被另一端的命令行消费者成功读取.
I am using [org.apache.kafka/kafka_2.10 "0.8.2.1"]
as dependency, and have already been able to use the client API to set up a KafkaProducer
which publishes messages with plain String content, and can be successfully read by the command-line consumer at the other side.
我的问题是:internets 上是否有一个代码示例来展示如何初始化一个 KafkaConsumer
,并在另一边阅读该消息,因为我已经搜索了好几天,但没有任何 代码示例 似乎有效:
My question is: Is there a single code example on the internets to show how to initialize a KafkaConsumer
, and read that message on the other side, because I have been searching for it for days and none of the code examples seem to be working:
- 他们使用的类或方法甚至不存在于 API 本身(例如,他们似乎将属性映射传递到
org.apache.kafka.clients.consumer.ConsumerConfig
的构造函数中,但不存在这样的构造函数; - 在
kafka.consumer.Consumer
类上调用createJavaConsumerConnector
静态方法......这些东西存在于哪个宇宙中?).
- They use classes or methods which are not even existing in he API itself (for example they seemingly pass the property-map into the constructor of
org.apache.kafka.clients.consumer.ConsumerConfig
, but no such constructor exists; - calling
createJavaConsumerConnector
static method on the classkafka.consumer.Consumer
... in which universe these things exist?).
而且通常每个示例看起来都非常复杂.我希望消息传递框架需要几行配置来连接到代理,以及一些功能来放置和接收队列或主题.为 Kafka 设置生产者并不是一件非常复杂的事情,我希望消费者也能类似.
And usually every example looks extremely over-complicated. I would expect a messaging framework to need a few lines of configuration for connecting to brokers, and some function to put and take to/from a queue or topic. Setting up the Producer for Kafka wasn't something extremely complicated, and I was expecting the Consumer to be similar.
看来我并不孤单 有了这个.
推荐答案
首先我想提一下,Kafka 0.8.0
、0.8.1 之间有一些 API 变化
和 0.8.2
(对 0.9.0
和 0.10.0
进行了重写和简化)——因此,你的问题有点开放,只是要求 0.8
.
First I want to mention, that there are a couple of API changes between Kafka 0.8.0
, 0.8.1
, and 0.8.2
(a mayor rewrite and simplification happened for 0.9.0
and 0.10.0
) -- thus, your question is a little open just asking for 0.8
.
要为 0.8.2.2
编写 Java 使用者,您需要包含依赖项:
To write a Java consumer for 0.8.2.2
you need to include dependency:
这适用于 Scala 2.11 -- 还有其他可用的 Scala 版本.
This is for Scala 2.11 -- there are other Scala version available, too.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.2</version>
</dependency>
请勿使用 kafka-clients
作为 0.8.x 的 artifactId.
消费者接收
键值对消息并将它们打印到 stdout
的最小示例如下所示:
A minimum example for a consumer receiving <String,String>
key-value pair messages and prints them to stdout
looks as follows:
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class ConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "myGroup");
final String topic = "test";
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1)); // number of consumer threads
KafkaStream<byte[], byte[]> stream = consumer.createMessageStreams(topicCountMap).get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
// infinite loop
while(it.hasNext()) {
System.out.println(new String(it.next().message()));
}
// non-reachable code...
consumer.shutdown();
}
}
一个完整的例子——使用多个消费者线程,包括正确关闭——可以在这里找到:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
A full example -- using multiple consumer thread, including proper shutdown -- can be found here: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
要对此进行测试,请遵循快速入门指南并通过 Kafka 的 控制台制作者.
To test this, follow the quickstart guide and send messages via Kafka's console-producer.
这篇关于如何让消费者在 Kafka 0.8 API 中工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!