骆驼卡夫卡生产者为何发展缓慢? [英] Why is camel kafka producer very slow?

查看:79
本文介绍了骆驼卡夫卡生产者为何发展缓慢?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用apache camel kafka作为客户端来生成消息,我观察到的是kafka生产者花费1 ms来推送消息,如果我通过使用骆驼聚合将消息合并为批处理,则要花费100ms来推送一条消息.

I am using apache camel kafka as client for producing message, what I observed is kafka producer taking 1 ms to push a message, if I merge message into batch by using camel aggregation then it is taking 100ms to push a single message.

安装简要说明 3 kafka clusther 16Core 32GB RAM

Brief description of installation 3 kafka clusther 16Core 32GB RAM

示例代码

    String endpoint="kafka:test?topic=test&brokers=nodekfa:9092,nodekfb:9092,nodekfc:9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536";      
    Message message = new Message();
    String payload = new ObjectMapper().writeValueAsString(message);
    StopWatch stopWatch = new StopWatch();
    stopWatch.watch();
    for (int i=0;i<size;i++)
    {
        producerTemplate.sendBody(endpoint,ExchangePattern.InOnly, payload);
    }
    logger.info("Time taken to push {} message is {}",size,stopWatch.getElasedTime());

骆驼生产者终点

kafka:[topic]?topic=[topic]&brokers=[brokers]&maxInFlightRequest=1

尽管kafka文档吹嘘的生产商tps约为100,000,但我的吞吐能力却达到了1000/s.

I am getting throughput of 1000/s though kafka documentation brag producer tps around 100,000.

让我知道骆驼-kafka或kafka本身是否存在任何错误.

Let me know if there is any bug in camel-kafka or in kafka itself.

生产者配置

     acks = 1
        batch.size = 65536
        bootstrap.servers = [nodekfa:9092, nodekfb:9092, nodekfc:9092]
        buffer.memory = 33554432
        client.id = 
        compression.type = none
        connections.max.idle.ms = 540000
        enable.idempotence = false
        interceptor.classes = []
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 1
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 305000
        retries = 0
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class  org.apache.kafka.common.serialization.StringSerializer


测试日志

DEBUG [2019-06-02 17:30:46,781]  c.g.p.f.u.AuditEventNotifier: >>> Took 3 millis for the exchange on the route : null
DEBUG [2019-06-02 17:30:46,781]  c.g.p.f.u.AuditEventNotifier: >>> Took 3 millis to send to external system : kafka://test?brokers=nodekfa%3A9092%2Cnodekfb%3A9092%2Cnodekfc%3A9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536&topic=test by thead http-nio-8551-exec-6
DEBUG [2019-06-02 17:30:46,783]  c.g.p.f.u.AuditEventNotifier: >>> Took 2 millis for the exchange on the route : null
DEBUG [2019-06-02 17:30:46,783]  c.g.p.f.u.AuditEventNotifier: >>> Took 2 millis to send to external system : kafka://test?brokers=nodekfa%3A9092%2Cnodekfb%3A9092%2Cnodekfc%3A9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536&topic=test by thead http-nio-8551-exec-6
DEBUG [2019-06-02 17:30:46,784]  c.g.p.f.u.AuditEventNotifier: >>> Took 1 millis for the exchange on the route : null
DEBUG [2019-06-02 17:30:46,785]  c.g.p.f.u.AuditEventNotifier: >>> Took 2 millis to send to external system : kafka://test?brokers=nodekfa%3A9092%2Cnodekfb%3A9092%2Cnodekfc%3A9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536&topic=test by thead http-nio-8551-exec-6
DEBUG [2019-06-02 17:30:46,786]  c.g.p.f.u.AuditEventNotifier: >>> Took 1 millis for the exchange on the route : null
DEBUG [2019-06-02 17:30:46,786]  c.g.p.f.u.AuditEventNotifier: >>> Took 1 millis to send to external system : kafka://test?brokers=nodekfa%3A9092%2Cnodekfb%3A9092%2Cnodekfc%3A9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536&topic=test by thead http-nio-8551-exec-6
DEBUG [2019-06-02 17:30:46,788]  c.g.p.f.u.AuditEventNotifier: >>> Took 2 millis for the exchange on the route : null
DEBUG [2019-06-02 17:30:46,788]  c.g.p.f.u.AuditEventNotifier: >>> Took 2 millis to send to external system : kafka://test?brokers=nodekfa%3A9092%2Cnodekfb%3A9092%2Cnodekfc%3A9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536&topic=test by thead http-nio-8551-exec-6
INFO  [2019-06-02 17:30:46,788]  c.g.p.f.a.MessageApiController: Time taken to push 5 message is 10ms


很明显,消息最少需要1ms的时间,默认工作池最大大小为20,如果我将压缩编解码器设置为snappy,则会使性能最差.

It is clearly taking minimum 1ms for message, default worker pool max size is 20 , if i set compression codec to snappy this will make performance worst.

让我知道我在想什么!

推荐答案

从此电子邮件

I am facing the same issue, from this email https://camel.465427.n5.nabble.com/Kafka-Producer-Performance-tp5785767p5785860.html I used https://camel.apache.org/manual/latest/aggregate-eip.html to create batches and got better performance

from("direct:dp.events")
.aggregate(constant(true), new ArrayListAggregationStrategy())
.completionSize(3)
.to(kafkaUri)
.to("log:out?groupInterval=1000&groupDelay=500")
.end();

我得到了:

 INFO  Received: 1670 new messages, with total 13949 so far. Last group took: 998 millis which is: 1,673.347 messages per second. average: 1,262.696

这正在使用1个使用带一个分区的Kafka协议的Azure事件中心.奇怪的是,当我使用带有5个分区的另一个EH时,与1个分区的示例相比,我的性能下降了.

This is using 1 Azure Event Hub using Kafka Protocol w/ one partition. The weird thing is that when I use another EH w/ 5 partitions I get bad performance compare to the 1 partition example...

除了向消息添加分区键并在发送到kafka端点之前添加聚合之外,我还可以通过增加workerPoolCoreSize和workerPoolMaxSize来每秒获取3K消息

I was able to get 3K message per second by increasing the workerPoolCoreSize and the workerPoolMaxSize, in addition to adding partition keys to the messages and adding aggregation before sending to kafka endpoint

这篇关于骆驼卡夫卡生产者为何发展缓慢?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆