Spark Streaming-根据按键分组的键值对计算统计信息 [英] Spark Streaming - Calculating stats from key-value pairs grouped by keys

查看:333
本文介绍了Spark Streaming-根据按键分组的键值对计算统计信息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

背景: 我正在使用Spark Streaming来流式传输来自Kafka的事件,这些事件以逗号分隔的键值对的形式出现 这是一个事件如何流到我的spark应用程序中的示例.

Background: I'm using Spark Streaming to stream events from Kafka which are in the form of comma separated key value pairs Here is an example of how events are streamed into my spark application.

Key1=Value1, Key2=Value2, Key3=Value3, Key4=Value4,responseTime=200
Key1=Value5, Key2=Value6, Key3=Value7, Key4=Value8,responseTime=150
Key1=Value9, Key2=Value10, Key3=Value11, Key4=Value12,responseTime=100

输出:

对于给定的批处理间隔,我想根据流中的不同键来计算不同的指标(平均,计数等).

I want to calculate different metrics (avg, count etc.) grouped by different keys in the stream for a given batch interval e.g.

  1. 通过键1,键2平均响应时间(响应时间是每个事件中的键之一)
  2. 按Key1,Key2计数

到目前为止我的尝试:

val stream = KafkaUtils
  .createDirectStream[String, String, StringDecoder, StringDecoder](
     ssc, kafkaParams, topicsSet)

val pStream = stream.persist()

val events: DStream[String] = pStream.flatMap(_._2.split(","))
val pairs= events.map(data => data.split("=")).map(array => (array(0), array(1)))
// pairs results in tuples of (Key1, Value1), (Key2, Value2) and so on.

更新-03/04 密钥Key1,Key2 ...可能在传入流中无序到达.

Update - 03/04 The keys Key1, Key2...can arrive out of order in the incoming stream.

欣赏您的输入/提示.

推荐答案

一个可能的解决方案是这样的:

One possible solution is something like this:

  • 创建一个代表每个记录的case类,这样我们就不必处​​理元组了:

  • create a case class representing each record so we don't have deal with tuples:

case class Record(
  key1: String, key2: String, key3: String, key4: String, rt: Double)

  • 使用正则表达式解析记录并删除格式错误的条目:

  • use regexp to parse record and drop malformed entries:

    import scala.util.matching.Regex
    
    val recordPattern = new Regex(
      "^Key1=(.*?), ?Key2=(.*?), ?Key3=(.*?), ?Key4=(.*?), ?" ++
      "responseTime=(0-9+)$"
    )
    
    val records = pStream.map {
      case recordPattern(key1, key2, key3, key4, rt) =>
        Some(Record(key1, key2, key3, key4, rt.toDouble))
      case _ => None
    }.flatMap(x => x) // Drop malformed
    

  • 将数据重塑为键值对:

  • reshape data to key-value pairs:

    val pairs = records.map(r => ((r.key1, r.key2), r.rt))
    

  • 创建一个分区程序,然后使用StatCounter汇总统计信息:

  • create a partitioner and use StatCounter to aggregate statistics:

    import org.apache.spark.util.StatCounter
    import org.apache.spark.HashPartitioner
    
    val paritioner: HashPartitioner = ???
    
    pairs.combineByKey[StatCounter](
      StatCounter(_), _ merge _,  _ merge _, paritioner
    )
    

  • 提取感兴趣的字段:

  • extract fields of interest:

    stats.mapValues(s => (s.count, s.mean))
    

  • 您也可以尝试对无序数据进行类似操作,尽管我强烈建议您在上游进行修复:

    You can also try something like this for unordered data although I would strongly suggest fixing things upstream:

    val kvPattern = "(\\w+)=(\\w+)".r
    val pairs = pStream.map(line => {
      val kvs = kvPattern.findAllMatchIn(line)
        .map(m => (m.group(1), m.group(2))).toMap
    
      // This will discard any malformed lines
      // (lack of key1, key2, lack or invalid format of responseTime)
      Try((
        (kvs("Key1"), kvs("Key2")), 
        kvs("responseTime").toDouble
      ))
    
    }).flatMap(_.toOption)
    

    并像以前一样进行.

    这篇关于Spark Streaming-根据按键分组的键值对计算统计信息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆