卡夫卡制片人挂在发送 [英] Kafka producer hangs on send

查看:74
本文介绍了卡夫卡制片人挂在发送的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

逻辑是,从自定义源获取数据的流作业必须同时写入Kafka和HDFS.

The logic is that a streaming job, getting data from a custom source has to write both to Kafka as well as HDFS.

我写了一个(非常)基本的Kafka生产者来做这个,但是整个流工作都挂在send方法上.

I wrote a (very) basic Kafka producer to do this, however the whole streaming job hangs on the send method.

class KafkaProducer(val kafkaBootstrapServers: String, val kafkaTopic: String, val sslCertificatePath: String, val sslCertificatePassword: String) {

  val kafkaProps: Properties = new Properties()
  kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers)
  kafkaProps.put("acks", "1")
  kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
  kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
  kafkaProps.put("ssl.truststore.location", sslCertificatePath)
  kafkaProps.put("ssl.truststore.password", sslCertificatePassword)

  val kafkaProducer: KafkaProducer[Long, Array[String]] = new KafkaProducer(kafkaProps)

  def sendKafkaMessage(message: Message): Unit = {
    message.data.foreach(list => {
      val producerRecord: ProducerRecord[Long, Array[String]] = new ProducerRecord[Long, Array[String]](kafkaTopic, message.timeStamp.getTime, list.toArray)
      kafkaProducer.send(producerRecord)
    })
  }
}

和调用生产者的代码:

receiverStream.foreachRDD(rdd => {
      val messageRowRDD: RDD[Row] = rdd.mapPartitions(partition => {
        val parser: Parser = new Parser
        val kafkaProducer: KafkaProducer = new KafkaProducer(kafkaBootstrapServers, kafkaTopic, kafkaSslCertificatePath, kafkaSslCertificatePass)
        val newPartition = partition.map(message => {
          Logger.getLogger("importer").error("Writing Message to Kafka...")
          kafkaProducer.sendKafkaMessage(message)
          Logger.getLogger("importer").error("Finished writing Message to Kafka")
          Message.data.map(singleMessage => parser.parseMessage(Message.timeStamp.getTime, singleMessage))
        })
        newPartition.flatten
      })

      val df = sqlContext.createDataFrame(messageRowRDD, Schema.messageSchema)

      Logger.getLogger("importer").info("Entries-count: " + df.count())
      val row = Try(df.first)

      row match {
        case Success(s) => Persister.writeDataframeToDisk(df, outputFolder)
        case Failure(e) => Logger.getLogger("importer").warn("Resulting DataFrame is empty. Nothing can be written")
      }
    })

从日志中,我可以看出每个执行者都达到了发送到kafka"的地步,但是还没有达到.所有执行者都对此持保留态度,不会抛出异常.

From the logs I can tell that each executor is reaching the "sending to kafka" point, however not any further. All executors hang on that and no exception is thrown.

Message类是一个非常简单的case类,具有2个字段,一个时间戳和一个字符串数组.

The Message class is a very simple case class with 2 fields, a timestamp and an array of strings.

推荐答案

这归因于Kafka中的acks字段.

This was due to the acks field in Kafka.

Acks设置为1,发送速度更快.

Acks was set to 1 and sends went ahead a lot faster.

这篇关于卡夫卡制片人挂在发送的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆