为什么这个Kafka消费者不关机? [英] Why isn't this Kafka consumer shutting down?
问题描述
我期望消耗测试仅读取一条消息并关闭.但是,即使在我调用consumer.shutdown()
之后,事实并非如此.想知道为什么吗?
I am expecting the consume test to read just one message and shutdown. However, it isn't, even after I am calling the consumer.shutdown()
. Wondering why?
测试
public class AppTest
{
App app=new App();
@org.junit.Test
public void publish()
{
assertTrue(app.publish("temptopic","message"));
}
@org.junit.Test
public void consume()
{
app.publish("temptopic","message");
assertEquals("message",app.consume("temptopic","tempgroup"));
}
}
被测类
public class App
{
public boolean publish(String topicName, String message) {
Properties p=new Properties();
p.put("bootstrap.servers","localhost:9092");
p.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
p.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
p.put("request.required.acks","1");
KafkaProducer producer = new KafkaProducer<String,String>(p);
ProducerRecord record=new ProducerRecord(topicName,"mykey",message);
Future<RecordMetadata> future = producer.send(record);
RecordMetadata recordMetadata=null;
try {
recordMetadata = future.get();
} catch (Exception e) {
return false;
}
return true;
}
public String consume(String topicName, String groupId)
{
Properties p=new Properties();
p.put("zookeeper.connect","localhost:2181");
p.put("group.id","groupId");
p.put("zookeeper.session.timeout.ms","500");
p.put("zookeeper.sync.time.ms","250");
p.put("auto.commit.interval.ms", "1000");
ConsumerConnector consumer=null;
String result = "";
try
{
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(p));
Map<String, Integer> topicPartitionCount = new HashMap<String, Integer>();
topicPartitionCount.put(topicName, Integer.valueOf(1));//use 1 thread to read
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicPartitionCount);
List<KafkaStream<byte[], byte[]>> streams = messageStreams.get(topicName);
for (KafkaStream stream : streams) {
ConsumerIterator it = stream.iterator();
while (it.hasNext()) {
MessageAndMetadata msg = it.next();
String strMsg = new String((byte[]) msg.message());
System.out.println("received: " + strMsg);
result += strMsg;
}
}
}
finally
{
if (consumer != null)
{
consumer.shutdown();
}
}
return result;
}
}
推荐答案
默认情况下,ConsumerIterator将无限期阻止,并等待下一条消息.为了使它退出while循环,您需要中断线程.
The ConsumerIterator will by default block indefinitely and just wait for the next message. To get it out of the while loop you would need to interrupt the thread.
有一个使用者属性"consumer.timeout.ms",可以将其设置为一个值,之后将向使用者引发超时异常,但这可能不是单元测试的最实用解决方案.
There is a consumer property "consumer.timeout.ms" which can be set to a value after which a timeout exception will be thrown to the consumer but maybe this is not the most practical solution for unit testing.
这篇关于为什么这个Kafka消费者不关机?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!