如何使用Kafka 0.8.2的Consumer API? [英] How to use Consumer API of Kafka 0.8.2?

查看:22
本文介绍了如何使用Kafka 0.8.2的Consumer API?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开始使用最新的 Kafka 文档 http://kafka.apache.org/documentation.html.但是当我尝试使用新的消费者 API 时遇到了一些问题.我已经通过以下步骤完成了这项工作:

I'm getting start with the latest Kafka document http://kafka.apache.org/documentation.html. But I meet some problem when I try to use the new Consumer API. I've done the job with following steps:

1.添加新的依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.1</version>
</dependency>

2.添加配置

    Map<String, Object> config = new HashMap<String, Object>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "host:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");

3.使用 KafkaConsumer API

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config);
consumer.subscribe("topic");

然而,当我尝试从代理轮询消息时,我得到的只有空值:

However, when I try to poll message from the broker, I got nothing but null:

Map<String, ConsumerRecords<String, String>> records = consumer.poll(0);
if (records != null)
    process(records);
else
    System.err.println("null");

然后我查看了源代码后才知道消费者有什么问题:

And then I know what's wrong with the consumer after I checked the source code:

@Override
public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
    // TODO Auto-generated method stub
    return null;
}

更糟糕的是,我找不到关于 0.8.2 API 的任何其他有用信息,因为所有关于 Kafka 的用法都与最新版本不兼容.有人可以帮我吗?非常感谢.

To make matters worse, I cannot find any other useful information about the 0.8.2 API, since all usages about Kafka are not compatible with the latest version. Could anybody help me? Thanks a lot.

推荐答案

我也在尝试在 Kafka 0.8.2.1 之上编写一个 Consumer 来读取新 Producer 产生的消息.

I am also trying to write a Consumer on top of Kafka 0.8.2.1 to read the messages produced by the new Producer.

到目前为止,我得到的是生产者 API 已准备就绪并可使用,而在消费者方面,我们必须等待 0.8.3,正如@habsq 所指出的,您已经发现为消费者包含了一些代码,但它仍然无法正常工作.

So far what I have got is that the Producer API is ready and usable, while on the consumer side we have to wait 0.8.3, as @habsq noted and you already find out that there is some code included for the Consumer, but it is still not functional.

因此要使用的客户端(当前的客户端 API)是在您当前 Kafka 版本的核心"项目中找到的客户端,即 0.8.2.1(最好不要将客户端降级到任何其他版本).

So the client to use (the current Client API) are the one found in the "core" project of your current Kafka version, i.e. 0.8.2.1 (better not downgrade the client to any other version).

所以现在我们需要导入两个 jar:一个用于新"java 客户端,一个用于核心项目,这也取决于您使用的 Scala 版本(我使用 2.11).

So for now we need to import two jars: one for the "new" java clients and one for the core project, depending also on the scala version you are using (I use 2.11).

就我而言,我使用 graddle 来管理依赖项,所以我只需要导入

In my case I use graddle to manage dependencies so I just need to import

dependencies {
  compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.8.2.1'
  compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '0.8.2.1'
}

当您更新依赖项时,它将获得所有需要的库.

When you update dependencies it will get all the needed libraries.

如果您使用不同的 Scala 版本,只需更改版本;无论如何,您可以在 maven central 上找到所有不同的版本或完整的 pom:http://search.maven.org/#search|ga|1|g%3A%22org.apache.kafka%22%20AND%20v%3A%220.8.2.1%22

If you are using a different Scala version just change the version; anyway you can find all the different version or the full pom on maven central: http://search.maven.org/#search|ga|1|g%3A%22org.apache.kafka%22%20AND%20v%3A%220.8.2.1%22

如果您使用那些 Consumer 实现,则所有当前示例都应该照常工作.

If you use those Consumer implementation all the current examples should work as usual.

PS 参考:Kafka-users ml 线程 http://grokbase.com/t/kafka/users/153bepxet5/high-level-consumer-example-in-0-8-2

PS ref: Kafka-users ml thread http://grokbase.com/t/kafka/users/153bepxet5/high-level-consumer-example-in-0-8-2

这篇关于如何使用Kafka 0.8.2的Consumer API?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆