如何有效地从集合中发送消息到Kafka [英] How to efficiently produce messages out of a collection to Kafka

查看:76
本文介绍了如何有效地从集合中发送消息到Kafka的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的Scala(2.11)流应用程序中,我正在使用IBM MQ中一个队列中的数据,并将其写入具有一个分区的Kafka主题.在使用了来自MQ的数据之后,消息有效负载被拆分为3000个较小的消息,这些消息存储在字符串序列中.然后,使用KafkaProducer将这3000封邮件中的每封邮件发送到Kafka(2.x版).

In my Scala (2.11) stream application I am consuming data from one queue in IBM MQ and writing it to a Kafka topic that has one partition. After consuming the data from the MQ the message payload gets splitted into 3000 smaller messages that are stored in a Sequence of Strings. Then each of these 3000 messages are send to Kafka (version 2.x) using KafkaProducer.

您将如何发送这3000条消息?

How would you send those 3000 messages?

我既不能增加IBM MQ中的队列数量(不受我的控制),也不能增加主题中的分区数量(消息的顺序是必需的,并且编写自定义分区程序将影响该主题的太多使用者)

I can't increase the number of queues in IBM MQ (not under my control) nor the number of partitions in the topic (ordering of messages is required, and writing a custom partitioner will impact too many consumers of the topic).

生产者设置当前为:

  • acks = 1
  • linger.ms = 0
  • batch.size = 65536

但是优化它们可能只是一个问题,而不是我当前问题的一部分.

But optimizing them is probably a question of its own and not part of my current problem.

当前,我正在

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

private lazy val kafkaProducer: KafkaProducer[String, String] = new KafkaProducer[String, String](someProperties)
val messages: Seq[String] = Seq(String1, …, String3000)
for (msg <- messages) {
    val future = kafkaProducer.send(new ProducerRecord[String, String](someTopic, someKey, msg))
    val recordMetadata = future.get()
}

在我看来,这似乎不是最优雅,最有效的方法.是否有编程方式来提高吞吐量?

To me it looks like not the most elegant and most efficient way. Is there a programmatic way to increase throughput?

感谢给出正确答案的答案,我仔细研究了不同的Producer方法. 《卡夫卡-权威指南》一书列出了这些方法:

Thanks to the answer pointing me to the right direction I had a closer look into the different Producer methods. The book Kafka - The Definitive Guide list these methods:

一劳永逸 我们会向服务器发送一条消息,并不关心消息是否成功到达.大多数情况下,它会成功到达,因为Kafka的可用性很高,并且生产者将重试自动发送消息.但是,使用此方法会丢失一些消息.

Fire-and-forget We send a message to the server and don’t really care if it arrives succesfully or not. Most of the time, it will arrive successfully, since Kafka is highly available and the producer will retry sending messages automatically. However, some messages will get lost using this method.

同步发送 我们发送一条消息,send()方法返回一个Future对象,然后使用get() 等待将来,看看send()是否成功.

Synchronous send We send a message, the send() method returns a Future object, and we use get() to wait on the future and see if the send() was successful or not.

异步发送 我们使用回调函数调用send()方法,该函数会在回调函数被触发时触发 收到Kafka经纪人的回应

Asynchronous send We call the send() method with a callback function, which gets triggered when it receives a response from the Kafka broker

现在我的代码看起来像这样(省去了错误处理和Callback类的定义):

And now my code looks like this (leaving out error handling and the definition of Callback class):

  val asyncProducer = new KafkaProducer[String, String](someProperties)

  for (msg <- messages) {
    val record = new ProducerRecord[String, String](someTopic, someKey, msg)
    asyncProducer.send(record, new compareProducerCallback)
  }
  asyncProducer.flush()

我已经比较了10000条非常小的消息的所有方法.这是我的测量结果:

I have compared all the methods for 10000 very small messages. Here is my measure result:

  1. 一劳永逸:173683464ns

  1. Fire-and-forget: 173683464ns

同步发送:29195039875ns

Synchronous send: 29195039875ns

异步发送:44153826ns

Asynchronous send: 44153826ns

说实话,通过选择正确的属性(batch.size,linger.ms等),可能有更多的潜力来优化所有这些属性.

To be honest, there is probably more potential to optimize all of them by choosing the right properties (batch.size, linger.ms, ...).

推荐答案

我能看到您的代码运行缓慢的最大原因是,您等待每个发送的将来.

the biggest reason i can see for your code to be slow is that youre waiting on every single send future.

kafka旨在发送批次.通过一次发送一条记录,您将为每条记录等待往返时间,并且您不会从压缩中获得任何好处.

kafka was designed to send batches. by sending one record at a time youre waiting round-trip time for every single record and youre not getting any benefit from compression.

要做的惯用"操作是发送所有内容,然后在第二个循环中阻止所有生成的期货.

the "idiomatic" thing to do would be send everything, and then block on all the resulting futures in a 2nd loop.

此外,如果您打算执行此操作,则我会继续备份(否则,您的第一条记录将导致一批大小为1的文件,从而使您的整体运行速度变慢.请参见

also, if you intend to do this i'd bump linger back up (otherwise your 1st record would result in a batch of size one, slowing you down overall. see https://en.wikipedia.org/wiki/Nagle%27s_algorithm) and call flush() on the producer once your send loop is done.

这篇关于如何有效地从集合中发送消息到Kafka的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆