星火流卡夫卡直接流处理时的性能尖峰 [英] Spark Streaming Kafka direct stream processing time performance spikes

查看:387
本文介绍了星火流卡夫卡直接流处理时的性能尖峰的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Spark流工作,从使用直接的方法卡夫卡集群读取数据。有一个在处理时间,我无法理解,而不是体现在星火UI度量一个周期性高峰。下图显示了这种模式(批处理时间为10秒):

I have a Spark Streaming job that reads data from a Kafka cluster using the direct approach. There is a cyclical spike in processing times that I cannot understand and is not reflected in the Spark UI metrics. The following image shows this pattern (batch time = 10s):

在这里输入的形象描述

这个问题每次作业运行时间是重复的。 有记录在卡夫卡没有数据被读取所以注意没有真正处理,执行。我期望线持平,接近最小值序列化和发送任务的执行者。

This issue is reproducible every time the job is run. There is no data in the Kafka logs to be read so there is no real processing, of note, to perform. I would expect the line to be flat, near the minimum value to serialize and send the tasks to the executors.

该模式是工作需要9秒(这有5秒调度延迟),接下来的工作需要5秒(无延迟调度)未来两年的工作需要大约0.8和0.2秒。

The pattern is a job takes 9 seconds (this has 5 seconds of scheduler delay), the next job takes 5 seconds (has no scheduler delay) the next two jobs take roughly 0.8 and 0.2 seconds.

的9和5第二职业不出现做更多的工作,根据火花的UI(除了调度延迟)。

The 9 and 5 second jobs don't appear to do more work, according to the Spark UI (apart from scheduler delay).

下面是5秒的工作任务时总结:

Here is the task time summary for the 5 second job:

在这里输入的形象描述

执行人没有服用任何接近5秒完成他们的任务。

None of the executors are taking anywhere near 5 seconds to complete their tasks.

有其他人遇到这样或你有什么建议,什么可能会导致此?

下面是主要的流code的一个精简版:

Here is a stripped down version of the main streaming code:

def main(args: Array[String]): Unit = {
    val (runtimeConfig: RuntimeConfig, cassandraConfig: CassandraConfig.type, kafkaConfig: KafkaConfig.type,
streamingContext: StreamingContext) = loadConfig(args)

    val kafkaParams = Map[String, String]("metadata.broker.list" -> kafkaConfig.metadataBrokerList, "fetch.message.max.bytes" -> kafkaConfig.fetchMessageMaxBytes)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder] (streamingContext, kafkaParams, Set(runtimeConfig.kafkaTopic))
    val uuidGenerator = streamingContext.sparkContext.broadcast(Generators.timeBasedGenerator(EthernetAddress.fromInterface()))

    runtimeConfig.kafkaTopic match {
      case Topics.edges => saveEdges(runtimeConfig, messages, uuidGenerator)
      case Topics.messages => {val formatter = streamingContext.sparkContext.broadcast(DateTimeFormat.forPattern(AppConfig.dateFormat))
    saveMessages(cassandraConfig, runtimeConfig, messages, formatter)}
    }
    streamingContext.start()
    streamingContext.awaitTermination()
}

def saveEdges(runtimeConfig: RuntimeConfig, kafkaStream: DStream[(String, String)],
               uuidGenerator: Broadcast[TimeBasedGenerator]): Unit = {
      val edgesMessages = kafkaStream.flatMap(msg => {
    implicit val formats = DefaultFormats
    parse(msg._2).extract[List[EdgeMessage]].flatMap(em => (List.fill(em.ids.size)(em.userId) zip em.ids))
  }).map(edge => Edge(edge._1, edge._2)).saveAsTextFiles("tester", ".txt")
}

星火设置:

val conf = new SparkConf()
.set("spark.mesos.executor.home", AppConfig.sparkHome)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.streaming.kafka.maxRatePerPartition", "1")
.set("spark.streaming.blockInterval", "500")
.set("spark.cores.max", "36")

相关build.sbt提取物:

Relevant build.sbt extract:

"org.apache.spark" % "spark-streaming-kafka_2.10"  % "1.5.1",
"org.apache.spark" %% "spark-core" % "1.5.1",
"org.apache.spark" %% "spark-streaming" % "1.5.1",
"org.apache.spark" %% "spark-graphx" % "1.5.1",


  • 卡夫卡版本:2-10-0.8.2.1

  • 资源管理器:Mesos 0.23

  • 集群详细信息:6星火工人,6卡夫卡经纪人,5节点动物园管理员乐团(在相同的机器)。 12卡夫卡分区。

  • 注意: sparktmp 卡夫卡日志一般都位于每个节点上的相同的旋转磁盘

    Note: sparktmp and kafka-logs are generally located on the same spinning disks on each node.

    推荐答案

    这个问题似乎是与Mesos调度。我不知道到底为什么它开始放缓这样的工作。不过,我重新启动Mesos集群现在的锯齿处理时间都没有了。

    The problem seems to be with the Mesos scheduler. I'm not sure exactly why it starts slowing down jobs like this. However I restarted the Mesos cluster and now the saw-tooth processing times are gone.

    正如你可以在这里看到现在的处理时间都比较固定的:

    As you can see here the processing times are now more stationary:

    在这里输入的形象描述

    这篇关于星火流卡夫卡直接流处理时的性能尖峰的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆