Kafka 上的 Spark Streaming 为来自 kafka 的不同值打印不同的案例 [英] Spark Streaming on Kafka print different cases for different values from kafka

查看:28
本文介绍了Kafka 上的 Spark Streaming 为来自 kafka 的不同值打印不同的案例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在下面说明我的情况:10000 - 服务器正在发送 DF 大小数据.(每 5 秒就有 10,000 个输入到来)

I am stating my scenario below: 10000 - Servers are sending DF size data. ( Every 5 seconds 10,000 inputs are coming )

如果对于任何服务器 DF 大小超过 70% 打印,将 ROM 大小增加 20%如果任何服务器使用的 DF 大小小于 30%,则将 ROM 大小减少 25%.

If for any server DF size is more than 70 % print increase the ROM size by 20 % If for any server DF size used is less than 30 % print decrease the ROM size by 25 %.

我提供了一个从 kafka 获取消息并与%"匹配并执行 to.upper() 的代码.此代码仅用于参考我的 kafka 详细信息.

I am providing a code that takes messages from kafka and matches with "%" and does to.upper(). This code is just for a reference to my kafka details.

谁能帮我解决这个场景.

Can anyone please help me with the scenario.

package rnd
import org.apache.spark.SparkConf
import kafka.serializer.StringDecoder
import org.apache.spark.sql.SQLContext
//import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
//import org.apache.spark.util.TimeStampedWeakValueHashMap.toWeakReference
import org.apache.spark.{SparkConf, SparkContext}


object WordFind {
    def main(args: Array[String]) {
        import org.apache.spark.SparkConf
        val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
        import org.apache.spark.streaming.StreamingContext
        import org.apache.spark.streaming.Seconds
        val batchIntervalSeconds = 2

        val ssc = new StreamingContext(conf, Seconds(10))
        import org.apache.spark.streaming.kafka.KafkaUtils
        import org.apache.spark.streaming.dstream.ReceiverInputDStream

        val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("wordcounttopic" -> 5))
        import org.apache.spark.streaming.dstream.DStream

        val filteredStream: DStream[(String, String)] = kafkaStream.filter(record =>
                record._2.contains("%")) // TODO : pattern matching here
        val outputDStream: DStream[String] = filteredStream.map(record => record._2.toUpperCase())

        outputDStream.print()

        ssc.start
        ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 5 * 1000)
    }
}

请帮我写出满足代码的场景.

Please help me with the scenario satisfying code.

样本输入

文件系统 1K 块已使用 可用使用% 已安装/dev/sda1 132239776 6210884 119311504 5%/tmpfs 4021876 0 4021876 0%/dev/shm

Filesystem 1K-blocks Used Available Use% Mounted on /dev/sda1 132239776 6210884 119311504 5% / tmpfs 4021876 0 4021876 0% /dev/shm

示例输出:如果在任何情况下使用%>70> 消息:将 ROM 大小增加 20%if Use%<30% for any case> 消息:将 ROM 大小减少 25%

Sample output: if Use%>70 for any case> Message: Increase ROM size by 20% if Use%<30% for any case> Message: Decrease ROM size by 25%

即使我必须把它放到 Elastic 搜索中,但它给出了错误:

even i have to put that to Elastic search and it is giving error:

package rnd
import org.apache.spark.SparkConf
import kafka.serializer.StringDecoder
import org.apache.spark.sql.SQLContext
//import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
//import org.apache.spark.util.TimeStampedWeakValueHashMap.toWeakReference
import org.apache.spark.{SparkConf, SparkContext}
object WordFind {
  def main(args: Array[String]) {
  }
  import org.apache.spark.SparkConf
  val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
  val sc = new SparkContext(conf)
  val checkpointDir = "/usr/local/kafka/kafka_2.11-0.11.0.2/checkpoint/"
  import org.apache.spark.streaming.StreamingContext
  import org.apache.spark.streaming.Seconds
  val batchIntervalSeconds = 2
  val ssc = new StreamingContext(conf, Seconds(10))
  import org.apache.spark.streaming.kafka.KafkaUtils
  import org.apache.spark.streaming.dstream.ReceiverInputDStream
  val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))
  val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, "localhost:2181",
    "spark-streaming-consumer-group", Map("wordcounttopic" -> 5))

  import org.apache.spark.streaming.dstream.DStream

  val filteredStream: DStream[Array[String]] = kafkaStream
    .filter(!_._2.contains("Filesystem"))  // eliminate header
    .map(_._2.split("\\s+"))  // split with space
  val outputDStream: DStream[String] = filteredStream.map {
    row =>
      val useIdx = row.length - 2
      // if Use%>70 for any case> Message: Increase ROM size by 20%
      // if Use%<30% for any case> Message: Decrease ROM size by 25%
      val usePercent = row(useIdx).replace("%", "").toInt
      usePercent match {
        case x if x > 70 => "Increase ROM size by 20%"
        case x if x < 30 => "Decrease ROM size by 25%"
        case _ => "Undefined"
      }

  outputDStream.print()
  import org.elasticsearch.spark.sql._
  outputDStream.saveToEs("dfvalueoperations_v1/kwc")
}
// To make sure data is not deleted by the time we query it interactively
ssc.remember(Minutes(1))
ssc.checkpoint(checkpointDir)
ssc
//    }
// This starts the streaming context in the background.
ssc.start()
// This is to ensure that we wait for some time before the background streaming job starts. This will put this cell on hold for 5 times the batchIntervalSeconds.
ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 5 * 1000)
}

error: Error:(51, 21) value saveToEs 不是 org.apache.spark.streaming.dstream.DStream[String] 的成员outputDStream.saveToEs("kafkamessage_v1/kwc")

error: Error:(51, 21) value saveToEs is not a member of org.apache.spark.streaming.dstream.DStream[String] outputDStream.saveToEs("kafkamessage_v1/kwc")

推荐答案

为获得所需的输出做了一些假设.

Have made few assumptions to get the required output.

1.) 标题可能出现在两者之间,因此使用过滤器来删除标题.

1.) Headers may appear in between hence a filter is used to remove the header.

Filesystem 1K-blocks Used Available Use% Mounted on

2.) 由于 Filesysytem 列在字符串内可能有空格,我使用最后一个索引提取了 use% .(如果这不起作用,请尝试 group regex 以达到相同的效果)

2.) Since the Filesysytem column may have space inside the string, I have extracted the use% using the second index from the last. (if this doesn't work please try group regex to achieve the same)

3.) 未定义使用百分比在 30 到 70 之间的情况,因此对于此类情况,输出消息包含未定义".

3.) The case for use percentage between 30 and 70 is not defined, hence the output message contains "Undefined" for such cases.

一个示例输入输出(使用Array[String])

A sample input output (using Array[String])

 scala> val input =
           |       """|Filesystem    512-blocks      Used Available Capacity iused      ifree %iused  Mounted on
           |          |/dev/disk1     234618880 154868528  79238352    67% 1784543 4293182736    0%   /
           |          |devfs                364       364         0   100%     630          0  100%   /dev
           |          |map -hosts             0         0         0   100%       0          0  100%   /net
           |          |map auto_home          0         0         0   100%       0          0  100%   /home""".stripMargin


 scala> val inputStr: Array[Array[String]] = input.split("\n").filter(!_.contains("Filesystem")).map(_.split("\\s+"))

 scala> val outputMessage = inputStr.map {
      |       row =>
      |         // Assuming the position is always second from last
      |         val elementPosition = row.length - 2 
      |         // if Use%>70 for any case> Message: Increase ROM size by 20%
      |         // if Use%<30% for any case> Message: Decrease ROM size by 25%
      |         val usePercent = row(elementPosition).replace("%", "").toInt
      |         usePercent match {
      |           case x if x > 70 => (usePercent, "Increase ROM size by 20%")
      |           case x if x < 30 => (usePercent, "Decrease ROM size by 25%")
      |           case _ => (usePercent, "Undefined")
      |         }
      |     }

 scala> outputMessage.foreach(println)
 (0,Decrease ROM size by 25%)
 (100,Increase ROM size by 20%)
 (100,Increase ROM size by 20%)
 (100,Increase ROM size by 20%)

此代码适用于 Array[String] 请针对 ReceiverInputDStream[(String, String)] 对其进行测试.代码必须类似于:

This code works for a Array[String] please test it for ReceiverInputDStream[(String, String)]. The code must be similar to:

 val filteredStream: DStream[Array[String]] = kafkaStream
       .filter(!_._2.contains("Filesystem"))  // eliminate header
       .map(_._2.split("\\s+"))  // split with space
 val outputDStream: DStream[String] = filteredStream.map {
       row =>
         val useIdx = row.length - 2
         // if Use%>70 for any case> Message: Increase ROM size by 20%
         // if Use%<30% for any case> Message: Decrease ROM size by 25%
         val usePercent = row(useIdx).replace("%", "").toInt
         usePercent match {
           case x if x > 70 => "Increase ROM size by 20%"
           case x if x < 30 => "Decrease ROM size by 25%"
           case _ => "Undefined"
         }
     }

希望这会有所帮助.

这篇关于Kafka 上的 Spark Streaming 为来自 kafka 的不同值打印不同的案例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆