为什么Kafka消费者性能很慢? [英] Why Kafka consumer performance is slow?

查看:27
本文介绍了为什么Kafka消费者性能很慢?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个简单的主题,一个简单的 Kafka 消费者和生产者,使用默认配置.

I have one simple topic, and one simple Kafka consumer and producer, using the default configuration.

程序很简单,我有两个线程.

The program is very simple, I have two threads.

在生产者中,它不断发送 16 个字节的数据.

In the producer, it keeps sending 16 bytes data.

在消费者端,它不断接收.

And in consumer side, it keeps receiving.

我发现生产者的吞吐量大约为 10MB/s,这很好.

I found the fact that, the throughput for producer is roughly 10MB/s, that is fine.

但消费者的吞吐量仅为 0.2MB/s.我已经禁用了所有调试日志,但这并没有让它变得更好.测试在本地机器上运行.任何机构都知道出了什么问题?谢谢!

But the throughput for consumer is only 0.2MB/s. I have disabled all the debugging logs but that does not make it any better. The test is running on local machine. Any body has an idea on what is going wrong? Thanks!

我使用的代码如下:制作人:

The code I used is below: Producer:

KafkaProducer producer = new KafkaProducer(props);
int size = 16;
byte[] payload = new byte[size];
String key = "key";
Arrays.fill(payload, (byte) 1);
ProducerRecord record = new ProducerRecord("test",0,key.getBytes(),payload);
while(true){
producer.send(record);
}

消费者:

Properties consumerProps = new Properties();
consumerProps.put("zookeeper.connect", "localhost:2181");
consumerProps.put("group.id", "test");
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("test", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("test");
ConsumerIterator<byte[], byte[]> it = streams.get(0).iterator();
while(it.hasNext()){
    it.next().message();
}

推荐答案

尝试使用以下属性配置消费者.

Try configuring Consumers with the following properties.

  1. fetch.min.bytes
  2. fetch.max.wait.ms
  3. max.partition.fetch.bytes

此外,您可以调整poll() 方法的超时参数以提高吞吐量.

Also, you can adjust a timeout parameter of the poll() method for throughput.

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

这篇关于为什么Kafka消费者性能很慢?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆