Java 中的 Kafka 消费者 [英] Kafka Consumer in Java

查看:56
本文介绍了Java 中的 Kafka 消费者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以我目前正在学习 Kafka,并试图复制 Apache 提供的示例 这里.这是消费者的示例代码,我已经用 java 编写了它,如图所示.但是,当我尝试执行该文件时,我遇到了一些问题.我可以编译文件,但无法正常运行.

So I am learning Kafka currently and have attempted to duplicate the examples provided from Apache here. This is example code for the consumer and I have written it in java just as shown. When I attempt to execute the file however I run into some issues. I am able to get the file to compile but it will not run properly.

我使用以下不带引号的行执行程序,java TestConsumer localhost:2181 group1 test 4"这传递了示例代码中必需的 4 个参数.但是当我执行这个命令时,我得到了以下错误.

I am executing the program with the following line without the quotations, "java TestConsumer localhost:2181 group1 test 4" This passes the 4 arguments necessary in the example code. I am provided with the following error though when I execute this command.

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/log4j/Category
    at kafka.utils.VerifiableProperties.<init>(Unknown Source)
    at kafka.consumer.ConsumerConfig.<init>(Unknown Source)
    at TestConsumer.ConsumerProps(TestConsumer.java:69)
    at TestConsumer.<init>(TestConsumer.java:31)
    at TestConsumer.main(TestConsumer.java:97)
    Caused by: java.lang.ClassNotFoundException: org.apache.log4j.Category
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    ... 5 more

我尝试用必要的值手动替换参数并尝试以这种方式执行,但我遇到了不同的问题.下面是错误消息以及我正在使用的代码,以防万一我从提供的示例中搞砸了一些东西.如果有人可以帮助我,我将非常感激,因为我正在尝试编写自己的使用者来测试解析给定的信息等.谢谢

I have tried going in an manually replacing the arguments with the necessary values and attempting to execute that way but I am given a different issue. Below is the error message along with the code I'm using just in case I screwed something up from the example provided. If anyone can help me out I would be incredibly appreciative since I am attempting to write my own consumer to test with parsing given information, etc. Thanks

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
Exception in thread "main" java.lang.NoClassDefFoundError: org/I0Itec/zkclient/IZkStateListener
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(Unknown Source)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(Unknown Source)
at kafka.consumer.Consumer$.createJavaConsumerConnector(Unknown Source)
at kafka.consumer.Consumer.createJavaConsumerConnector(Unknown Source)
at TestConsumer.<init>(TestConsumer.java:31)
at TestConsumer.main(TestConsumer.java:97)
Caused by: java.lang.ClassNotFoundException: org.I0Itec.zkclient.IZkStateListener
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
... 6 more

/*
 *	Test Consumer to gather input from
 *	a Producer. Attempt to perform functions
 *	from the produced data
*/


// Kafka API
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.Map;
import java.util.HashMap;
import java.util.Properties;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;


public class TestConsumer{
	
	private final ConsumerConnector consumer;
	private final String topic;
	private ExecutorService executor;
	
	
	// CONSTRUCTOR
	public TestConsumer(String zookeeper, String groupid, String aTopic){
		consumer = kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerProps(zookeeper, groupid));
		this.topic = aTopic;
	}
	// END CONSTRUCTOR
	
	
	// RUN FUNCTION
	public void run(int threads){
		Map<String, Integer> topicMap = new HashMap<String, Integer>();
		topicMap.put(topic, new Integer(threads));
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicMap);
		List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
		
		executor = Executors.newFixedThreadPool(threads);	// process threads
		
		int numThread = 0;	// thread counter for consumption
		
		// consumer all messages
		for(final KafkaStream stream : streams){
			executor.submit(new TestConsumerRun(stream, numThread));
			numThread ++;
		}
	}
	// END RUN FUNCTION
	
	
	// CREATE PROPERTIES FUNCTION
	private static ConsumerConfig ConsumerProps(String zookeeper, String groupid){
		
		Properties properties = new Properties();	// config properties file
		
		properties.put("zookeeper.connect", zookeeper);
		properties.put("group.id", groupid);
		properties.put("zookeeper.session.timeout.ms", "400");
		properties.put("zookeeper.sync.time.ms", "200");
		properties.put("auto.commit.interval.ms", "1000");
		properties.put("auto.offset.reset", "smallest");
		
		return new ConsumerConfig(properties);
	}
	// END CREATE PROPERTIES FUNCTION
	
	
	// SHUTDOWN FUNCTION
	public void shutdown(){
		if (consumer != null) consumer.shutdown();
		if (executor != null) executor.shutdown();
		
		try{
			if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)){
				System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
			}
			} catch (InterruptedException e){
				System.out.println("Interrupted during shutdown, exiting uncleanly");
		}
	}
	// END SHUTDOWN FUNCTION
	
	
	// MAIN FUNCTION
	public static void main(String[] args){
		String zookeeper = args[0];
		String groupid = args[1];
		String topic = args[2];
		int threads = Integer.parseInt(args[3]);
		
		TestConsumer test = new TestConsumer(zookeeper, groupid, topic);	// send information to constructor
		test.run(threads);	// pass threads for iteration
		
		try{
			Thread.sleep(10000);
		} catch (InterruptedException ie){
		}
		
		test.shutdown();	// close program
	}	
	// END MAIN FUNCTION
	
}

/*
 *	Test Consumer to gather input from
 *	a Producer. Attempt to perform functions
 *	from the produced data
*/

// Kafka API
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;


public class TestConsumerRun implements Runnable{
	
	private KafkaStream aStream;
	private int aThread;
	
	
	// CONSTRUCTOR
	public TestConsumerRun(KafkaStream stream, int thread){
		aStream = stream;	// set stream from main read
		aThread = thread;	// set thread from main read
	}
	// END CONSTRUCTOR
	
	
	// RUN FUNCTION
	public void run(){
		
		ConsumerIterator<byte[], byte[]> iterator = aStream.iterator();		// used to check throughout the list continiously
		
		while(iterator.hasNext())
			System.out.println("Thread " + aThread + ": " + new String(iterator.next().message()));
		System.out.println("Shutting down Thread: " + aThread);
		
	}
	// END RUN FUNCTION
}

推荐答案

尝试在 main 方法中添加 BasicConfigurator.configure(); ,它会正常工作.

Try adding BasicConfigurator.configure(); in the main method and it will work fine.

这篇关于Java 中的 Kafka 消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆