为什么骆驼 kafka 生产者很慢? [英] Why is camel kafka producer very slow?

查看:85
本文介绍了为什么骆驼 kafka 生产者很慢?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 apache camel kafka 作为客户端来生成消息,我观察到 kafka 生成器需要 1 毫秒来推送消息,如果我使用骆驼聚合将消息合并到批处理中,那么推送一条消息需要 100 毫秒.

I am using apache camel kafka as client for producing message, what I observed is kafka producer taking 1 ms to push a message, if I merge message into batch by using camel aggregation then it is taking 100ms to push a single message.

安装简述3 卡夫卡集群 16 核 32GB 内存

Brief description of installation 3 kafka clusther 16Core 32GB RAM

示例代码

    String endpoint="kafka:test?topic=test&brokers=nodekfa:9092,nodekfb:9092,nodekfc:9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536";      
    Message message = new Message();
    String payload = new ObjectMapper().writeValueAsString(message);
    StopWatch stopWatch = new StopWatch();
    stopWatch.watch();
    for (int i=0;i<size;i++)
    {
        producerTemplate.sendBody(endpoint,ExchangePattern.InOnly, payload);
    }
    logger.info("Time taken to push {} message is {}",size,stopWatch.getElasedTime());

骆驼生产者端点

kafka:[topic]?topic=[topic]&brokers=[brokers]&maxInFlightRequest=1

尽管 kafka 文档吹嘘生产者的 tps 约为 100,000,但我获得了 1000/s 的吞吐量.

I am getting throughput of 1000/s though kafka documentation brag producer tps around 100,000.

如果camel-kafka 或kafka 本身有任何错误,请告诉我.

Let me know if there is any bug in camel-kafka or in kafka itself.

生产者配置

     acks = 1
        batch.size = 65536
        bootstrap.servers = [nodekfa:9092, nodekfb:9092, nodekfc:9092]
        buffer.memory = 33554432
        client.id = 
        compression.type = none
        connections.max.idle.ms = 540000
        enable.idempotence = false
        interceptor.classes = []
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 1
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 305000
        retries = 0
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class  org.apache.kafka.common.serialization.StringSerializer


测试日志

DEBUG [2019-06-02 17:30:46,781]  c.g.p.f.u.AuditEventNotifier: >>> Took 3 millis for the exchange on the route : null
DEBUG [2019-06-02 17:30:46,781]  c.g.p.f.u.AuditEventNotifier: >>> Took 3 millis to send to external system : kafka://test?brokers=nodekfa%3A9092%2Cnodekfb%3A9092%2Cnodekfc%3A9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536&topic=test by thead http-nio-8551-exec-6
DEBUG [2019-06-02 17:30:46,783]  c.g.p.f.u.AuditEventNotifier: >>> Took 2 millis for the exchange on the route : null
DEBUG [2019-06-02 17:30:46,783]  c.g.p.f.u.AuditEventNotifier: >>> Took 2 millis to send to external system : kafka://test?brokers=nodekfa%3A9092%2Cnodekfb%3A9092%2Cnodekfc%3A9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536&topic=test by thead http-nio-8551-exec-6
DEBUG [2019-06-02 17:30:46,784]  c.g.p.f.u.AuditEventNotifier: >>> Took 1 millis for the exchange on the route : null
DEBUG [2019-06-02 17:30:46,785]  c.g.p.f.u.AuditEventNotifier: >>> Took 2 millis to send to external system : kafka://test?brokers=nodekfa%3A9092%2Cnodekfb%3A9092%2Cnodekfc%3A9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536&topic=test by thead http-nio-8551-exec-6
DEBUG [2019-06-02 17:30:46,786]  c.g.p.f.u.AuditEventNotifier: >>> Took 1 millis for the exchange on the route : null
DEBUG [2019-06-02 17:30:46,786]  c.g.p.f.u.AuditEventNotifier: >>> Took 1 millis to send to external system : kafka://test?brokers=nodekfa%3A9092%2Cnodekfb%3A9092%2Cnodekfc%3A9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536&topic=test by thead http-nio-8551-exec-6
DEBUG [2019-06-02 17:30:46,788]  c.g.p.f.u.AuditEventNotifier: >>> Took 2 millis for the exchange on the route : null
DEBUG [2019-06-02 17:30:46,788]  c.g.p.f.u.AuditEventNotifier: >>> Took 2 millis to send to external system : kafka://test?brokers=nodekfa%3A9092%2Cnodekfb%3A9092%2Cnodekfc%3A9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536&topic=test by thead http-nio-8551-exec-6
INFO  [2019-06-02 17:30:46,788]  c.g.p.f.a.MessageApiController: Time taken to push 5 message is 10ms


显然消息最少需要 1ms,默认工作池最大大小为 20 ,如果我将压缩编解码器设置为 snappy,这将使性能最差.

It is clearly taking minimum 1ms for message, default worker pool max size is 20 , if i set compression codec to snappy this will make performance worst.

让我知道我错过了什么!!

Let me know what I am missing !!

推荐答案

我面临同样的问题,来自这封电子邮件 https://camel.465427.n5.nabble.com/Kafka-Producer-Performance-tp5785767p5785860.html 我用了 https://camel.apache.org/manual/latest/aggregate-eip.html 创建批次并获得了更好的性能

I am facing the same issue, from this email https://camel.465427.n5.nabble.com/Kafka-Producer-Performance-tp5785767p5785860.html I used https://camel.apache.org/manual/latest/aggregate-eip.html to create batches and got better performance

from("direct:dp.events")
.aggregate(constant(true), new ArrayListAggregationStrategy())
.completionSize(3)
.to(kafkaUri)
.to("log:out?groupInterval=1000&groupDelay=500")
.end();

我明白了:

 INFO  Received: 1670 new messages, with total 13949 so far. Last group took: 998 millis which is: 1,673.347 messages per second. average: 1,262.696

这是使用 1 个 Azure 事件中心,使用 Kafka 协议和一个分区.奇怪的是,当我使用另一个带有 5 个分区的 EH 时,与 1 个分区示例相比,我的性能很差...

This is using 1 Azure Event Hub using Kafka Protocol w/ one partition. The weird thing is that when I use another EH w/ 5 partitions I get bad performance compare to the 1 partition example...

我能够通过增加 workerPoolCoreSize 和 workerPoolMaxSize 每秒获得​​ 3K 消息,此外还向消息添加分区键并在发送到 kafka 端点之前添加聚合

I was able to get 3K message per second by increasing the workerPoolCoreSize and the workerPoolMaxSize, in addition to adding partition keys to the messages and adding aggregation before sending to kafka endpoint

这篇关于为什么骆驼 kafka 生产者很慢?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆