如何使消费者在Kafka 0.8 API中工作 [英] How to get consumers to work in Kafka 0.8 API

查看:78
本文介绍了如何使消费者在Kafka 0.8 API中工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我将要编写一个原型,用于发布和使用kafka消息。
我们确实已经建立了Cloudera基础架构(动物园管理员,经纪人等),而且我已经成功地使用了Kafka命令行工具来生成和使用消息。



我正在使用 [org.apache.kafka / kafka_2.10 0.8.2.1] 作为依赖项,并且已经能够使用客户端API来设置 KafkaProducer ,它发布具有纯字符串内容的消息,并且可以由另一端的命令行使用者成功读取。



我的问题是:互联网展示如何初始化 KafkaConsumer ,并在另一侧阅读该消息,因为我已经搜索了好几天了,而且都没有代码示例似乎可以正常工作:




  • 它们使用cla API本身甚至不存在的sses或方法(例如,它们似乎将属性映射传递到 org.apache.kafka.clients.consumer.ConsumerConfig 的构造函数中,但不存在此类构造函数;

  • 在类 kafka.consumer.Consumer <上调用 createJavaConsumerConnector 静态方法。 / code> ...这些东西存在于宇宙中?)。



通常每个示例看起来都过于复杂。我希望消息传递框架需要几行配置才能连接到代理,并且需要一些功能来放入队列或主题或从队列或主题获取。为Kafka设置Producer并不是一件非常复杂的事情,而且我期望Consumer也是如此。



似乎我也是并不孤单与此。

解决方案

首先,我想说一下,Kafka 0.8.0 0.8.1 0.8.2 (市长重写并简化了 0.9.0 0.10.0 )-因此,您的问题有点开放,只要求 0.8



要为 0.8.2.2 编写Java使用者,您需要包括依赖项:


这是针对Scala 2.11的-还有其他Scala版本可用。




 < dependency> 
< groupId> org.apache.kafka< / groupId>
< artifactId> kafka_2.11< / artifactId>
< version> 0.8.2.2< / version>
< / dependency>

请勿使用 kafka客户端 作为0.8.x的artifactId。



接收< String的消费者的最小示例,String> 键值对消息并将它们打印到 stdout 看起来如下:



< pre class = lang-java prettyprint-override> import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

公共类ConsumerExample {

public static void main(String [] args){
Properties props = new Properties();
props.put( zookeeper.connect, localhost:2181);
props.put( group.id, myGroup);

final String topic = test;

ConsumerConnector消费者= Consumer.createJavaConsumerConnector(new ConsumerConfig(props));

Map< String,Integer> topicCountMap = new HashMap< String,Integer>();
topicCountMap.put(topic,new Integer(1)); //使用者线程数

KafkaStream< byte [],byte []>流= Consumer.createMessageStreams(topicCountMap).get(topic).get(0);

ConsumerIterator< byte [],byte []>它= stream.iterator();

//无限循环
while(it.hasNext()){
System.out.println(new String(it.next()。message()));
}

//不可访问的代码...
Consumer.shutdown();
}
}

完整示例-使用多个使用者线程,包括正常关闭-可以在此处找到: https://cwiki.apache。 org / confluence / display / KAFKA / Consumer + Group + Example



要对此进行测试,请按照 quickstart 指南,并通过Kafka的控制台生产者发送消息。


I am about to write a prototype for publishing and consuming kafka messages. We do have a Cloudera infrastructure set up already (zookeepers, brokers, etc.), and I have played with the Kafka command-line tools successfully already, to produce and consume messages.

I am using [org.apache.kafka/kafka_2.10 "0.8.2.1"] as dependency, and have already been able to use the client API to set up a KafkaProducer which publishes messages with plain String content, and can be successfully read by the command-line consumer at the other side.

My question is: Is there a single code example on the internets to show how to initialize a KafkaConsumer, and read that message on the other side, because I have been searching for it for days and none of the code examples seem to be working:

  • They use classes or methods which are not even existing in he API itself (for example they seemingly pass the property-map into the constructor of org.apache.kafka.clients.consumer.ConsumerConfig, but no such constructor exists;
  • calling createJavaConsumerConnector static method on the class kafka.consumer.Consumer... in which universe these things exist?).

And usually every example looks extremely over-complicated. I would expect a messaging framework to need a few lines of configuration for connecting to brokers, and some function to put and take to/from a queue or topic. Setting up the Producer for Kafka wasn't something extremely complicated, and I was expecting the Consumer to be similar.

It also seems I am not alone with this.

解决方案

First I want to mention, that there are a couple of API changes between Kafka 0.8.0, 0.8.1, and 0.8.2 (a mayor rewrite and simplification happened for 0.9.0 and 0.10.0) -- thus, your question is a little open just asking for 0.8.

To write a Java consumer for 0.8.2.2 you need to include dependency:

This is for Scala 2.11 -- there are other Scala version available, too.

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>0.8.2.2</version>
</dependency>

Do not use kafka-clients as artifactId for 0.8.x.

A minimum example for a consumer receiving <String,String> key-value pair messages and prints them to stdout looks as follows:

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class ConsumerExample {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "myGroup");

        final String topic = "test";

        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1)); // number of consumer threads

        KafkaStream<byte[], byte[]> stream = consumer.createMessageStreams(topicCountMap).get(topic).get(0);

        ConsumerIterator<byte[], byte[]> it = stream.iterator();

        // infinite loop
        while(it.hasNext()) {
            System.out.println(new String(it.next().message()));
        }

        // non-reachable code...
        consumer.shutdown();
    }
}

A full example -- using multiple consumer thread, including proper shutdown -- can be found here: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

To test this, follow the quickstart guide and send messages via Kafka's console-producer.

这篇关于如何使消费者在Kafka 0.8 API中工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆