关闭 Kafka 连接 [英] Closing a Kafka connection

查看:49
本文介绍了关闭 Kafka 连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个应用程序应该向 Kafka 发送有限数量的消息然后退出.出于某种原因,即使我关闭了生产者,Kafka 连接也会保持正常.我的实现(在 Scala 中)或多或少

I have an application that should send a finite number of messages to Kafka and then quit. For some reason, the Kafka connection stays up even if I close the producer. My implementation (in Scala) is more or less

object Kafka {
  private val props = new Properties()

  props.put("compression.codec", DefaultCompressionCodec.codec.toString)
  props.put("producer.type", "sync")
  props.put("metadata.broker.list", "localhost:9092")
  props.put("batch.num.messages", "200")
  props.put("message.send.max.retries", "3")
  props.put("request.required.acks", "-1")
  props.put("client.id", "myclient")

  private val producer = new Producer[Array[Byte], Array[Byte]](new ProducerConfig(props))
  private def encode(msg: Message) = new KeyedMessage("topic", msg.id.getBytes, write(msg).getBytes)

  def send(msg: Message) = Try(producer.send(encode(msg)))
  def close() = producer.close()
}

这里的 Message 是一个简单的 case 类,我如何将它转换为字节数组并不重要.

Here Message is a simple case class, and how I convert it to byte array is not really relevant.

消息确实到达了,但是当我最终调用 Kafka.close() 时,应用程序没有退出,并且连接似乎没有被释放.

The messages do arrive, but when I eventually call Kafka.close(), the application does not exit, and the connection does not seem to be released.

有没有办法明确要求Kafka终止连接?

Is there a way to explicitly ask Kafka to terminate the connection?

推荐答案

def close() = producer.close()

def close() = producer.close()

这将创建一个名为close"的函数,该函数调用 producer.close()我看不到任何证据表明您的代码实际上关闭了生产者.

This creates a function called "close" that calls producer.close() I see no evidence that your code actually closes the producer.

您只需致电:生产者.close

You need to just call: producer.close

这篇关于关闭 Kafka 连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆