如何使用Kafka 0.8.2的Consumer API? [英] How to use Consumer API of Kafka 0.8.2?

查看:651
本文介绍了如何使用Kafka 0.8.2的Consumer API?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我从最新的Kafka文档 http://kafka.apache.org/documentation开始. html .但是,当我尝试使用新的Consumer API时遇到了一些问题.我已经完成了以下步骤:

I'm getting start with the latest Kafka document http://kafka.apache.org/documentation.html. But I meet some problem when I try to use the new Consumer API. I've done the job with following steps:

1.添加新的依赖项

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.1</version>
</dependency>

2.添加配置

    Map<String, Object> config = new HashMap<String, Object>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "host:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");

3.使用KafkaConsumer API

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config);
consumer.subscribe("topic");

但是,当我尝试轮询来自代理的消息时,除了null之外,我什么也没有:

However, when I try to poll message from the broker, I got nothing but null:

Map<String, ConsumerRecords<String, String>> records = consumer.poll(0);
if (records != null)
    process(records);
else
    System.err.println("null");

然后,在检查了源代码之后,我知道使用者有什么问题:

And then I know what's wrong with the consumer after I checked the source code:

@Override
public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
    // TODO Auto-generated method stub
    return null;
}

更糟糕的是,我找不到关于0.8.2 API的任何其他有用信息,因为有关Kafka的所有用法均与最新版本不兼容.有人可以帮我吗?非常感谢.

To make matters worse, I cannot find any other useful information about the 0.8.2 API, since all usages about Kafka are not compatible with the latest version. Could anybody help me? Thanks a lot.

推荐答案

我还试图在Kafka 0.8.2.1之上编写一个Consumer,以读取新的Producer产生的消息.

I am also trying to write a Consumer on top of Kafka 0.8.2.1 to read the messages produced by the new Producer.

到目前为止,我所知道的是Producer API已准备就绪并且可以使用,而在使用者方面,我们必须等待0.8.3(如@habsq所指出的那样),并且您已经发现为Consumer使用了一些代码,但仍无法正常工作.

So far what I have got is that the Producer API is ready and usable, while on the consumer side we have to wait 0.8.3, as @habsq noted and you already find out that there is some code included for the Consumer, but it is still not functional.

因此要使用的客户端(当前的客户端API)是在您当前的Kafka版本(即0.8.2.1)的核心"项目中找到的客户端(最好不要将客户端降级到任何其他版本).

So the client to use (the current Client API) are the one found in the "core" project of your current Kafka version, i.e. 0.8.2.1 (better not downgrade the client to any other version).

因此,现在我们需要导入两个jar:一个用于新" java客户端,一个用于核心项目,这也取决于您所使用的scala版本(我使用2.11).

So for now we need to import two jars: one for the "new" java clients and one for the core project, depending also on the scala version you are using (I use 2.11).

在我的情况下,我使用graddle来管理依赖项,因此我只需要导入

In my case I use graddle to manage dependencies so I just need to import

dependencies {
  compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.8.2.1'
  compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '0.8.2.1'
}

更新依赖项时,它将获取所有需要的库.

When you update dependencies it will get all the needed libraries.

如果您使用的是其他Scala版本,只需更改版本即可;无论如何,您可以在Maven Central上找到所有不同的版本或完整的pom:

If you are using a different Scala version just change the version; anyway you can find all the different version or the full pom on maven central: http://search.maven.org/#search|ga|1|g%3A%22org.apache.kafka%22%20AND%20v%3A%220.8.2.1%22

如果使用这些Consumer实现,则所有当前示例都应照常运行.

If you use those Consumer implementation all the current examples should work as usual.

PS参考:Kafka-users ml线程 http://grokbase.com/t/kafka/users/153bepxet5/high-level-consumer-example-in-0-8-2

PS ref: Kafka-users ml thread http://grokbase.com/t/kafka/users/153bepxet5/high-level-consumer-example-in-0-8-2

这篇关于如何使用Kafka 0.8.2的Consumer API?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆