Kafka上的Spark Streaming打印来自kafka的不同值的不同大小写案例 [英] Spark Streaming on Kafka print different cases for different values from kafka

查看:52
本文介绍了Kafka上的Spark Streaming打印来自kafka的不同值的不同大小写案例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在下面说明我的情况:10000-服务器正在发送DF大小数据.(每5秒就会有10,000个输入)

I am stating my scenario below: 10000 - Servers are sending DF size data. ( Every 5 seconds 10,000 inputs are coming )

如果任何服务器的DF大小大于70%,则将ROM大小增加20%如果任何服务器使用的DF尺寸小于30%,则将ROM尺寸减小25%.

If for any server DF size is more than 70 % print increase the ROM size by 20 % If for any server DF size used is less than 30 % print decrease the ROM size by 25 %.

我正在提供一个代码,该代码接收来自kafka的消息,并与%"匹配并执行to.upper().这段代码仅供参考,用于我的kafka详细信息.

I am providing a code that takes messages from kafka and matches with "%" and does to.upper(). This code is just for a reference to my kafka details.

有人可以帮我解决这个问题吗?

Can anyone please help me with the scenario.

package rnd
import org.apache.spark.SparkConf
import kafka.serializer.StringDecoder
import org.apache.spark.sql.SQLContext
//import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
//import org.apache.spark.util.TimeStampedWeakValueHashMap.toWeakReference
import org.apache.spark.{SparkConf, SparkContext}


object WordFind {
    def main(args: Array[String]) {
        import org.apache.spark.SparkConf
        val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
        import org.apache.spark.streaming.StreamingContext
        import org.apache.spark.streaming.Seconds
        val batchIntervalSeconds = 2

        val ssc = new StreamingContext(conf, Seconds(10))
        import org.apache.spark.streaming.kafka.KafkaUtils
        import org.apache.spark.streaming.dstream.ReceiverInputDStream

        val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("wordcounttopic" -> 5))
        import org.apache.spark.streaming.dstream.DStream

        val filteredStream: DStream[(String, String)] = kafkaStream.filter(record =>
                record._2.contains("%")) // TODO : pattern matching here
        val outputDStream: DStream[String] = filteredStream.map(record => record._2.toUpperCase())

        outputDStream.print()

        ssc.start
        ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 5 * 1000)
    }
}

请帮助我提供令人满意的代码方案.

Please help me with the scenario satisfying code.

样本输入

已使用文件系统1K块可用用法%已安装在/dev/sda1 132239776 6210884 119311504 5%/tmpfs 4021876 0 4021876 0%/dev/shm

Filesystem 1K-blocks Used Available Use% Mounted on /dev/sda1 132239776 6210884 119311504 5% / tmpfs 4021876 0 4021876 0% /dev/shm

样本输出:如果在任何情况下都使用%> 70>消息:将ROM大小增加20%如果在任何情况下都使用%<30%>消息:将ROM大小减小25%

Sample output: if Use%>70 for any case> Message: Increase ROM size by 20% if Use%<30% for any case> Message: Decrease ROM size by 25%

即使我必须将其放入弹性搜索中,也会出现错误:

even i have to put that to Elastic search and it is giving error:

package rnd
import org.apache.spark.SparkConf
import kafka.serializer.StringDecoder
import org.apache.spark.sql.SQLContext
//import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
//import org.apache.spark.util.TimeStampedWeakValueHashMap.toWeakReference
import org.apache.spark.{SparkConf, SparkContext}
object WordFind {
  def main(args: Array[String]) {
  }
  import org.apache.spark.SparkConf
  val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
  val sc = new SparkContext(conf)
  val checkpointDir = "/usr/local/kafka/kafka_2.11-0.11.0.2/checkpoint/"
  import org.apache.spark.streaming.StreamingContext
  import org.apache.spark.streaming.Seconds
  val batchIntervalSeconds = 2
  val ssc = new StreamingContext(conf, Seconds(10))
  import org.apache.spark.streaming.kafka.KafkaUtils
  import org.apache.spark.streaming.dstream.ReceiverInputDStream
  val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))
  val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, "localhost:2181",
    "spark-streaming-consumer-group", Map("wordcounttopic" -> 5))

  import org.apache.spark.streaming.dstream.DStream

  val filteredStream: DStream[Array[String]] = kafkaStream
    .filter(!_._2.contains("Filesystem"))  // eliminate header
    .map(_._2.split("\\s+"))  // split with space
  val outputDStream: DStream[String] = filteredStream.map {
    row =>
      val useIdx = row.length - 2
      // if Use%>70 for any case> Message: Increase ROM size by 20%
      // if Use%<30% for any case> Message: Decrease ROM size by 25%
      val usePercent = row(useIdx).replace("%", "").toInt
      usePercent match {
        case x if x > 70 => "Increase ROM size by 20%"
        case x if x < 30 => "Decrease ROM size by 25%"
        case _ => "Undefined"
      }

  outputDStream.print()
  import org.elasticsearch.spark.sql._
  outputDStream.saveToEs("dfvalueoperations_v1/kwc")
}
// To make sure data is not deleted by the time we query it interactively
ssc.remember(Minutes(1))
ssc.checkpoint(checkpointDir)
ssc
//    }
// This starts the streaming context in the background.
ssc.start()
// This is to ensure that we wait for some time before the background streaming job starts. This will put this cell on hold for 5 times the batchIntervalSeconds.
ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 5 * 1000)
}

错误:错误:(51,21)值saveToEs不是org.apache.spark.streaming.dstream.DStream [String]的成员outputDStream.saveToEs("kafkamessage_v1/kwc")

error: Error:(51, 21) value saveToEs is not a member of org.apache.spark.streaming.dstream.DStream[String] outputDStream.saveToEs("kafkamessage_v1/kwc")

推荐答案

为获得所需的输出所做的一些假设.

Have made few assumptions to get the required output.

1.)标头可能会出现在中间,因此使用过滤器删除标头.

1.) Headers may appear in between hence a filter is used to remove the header.

已使用的文件系统1K块可用使用%已安装在

2.)由于 Filesysytem 列在字符串内可能有空格,因此我使用了最后一个的第二个索引提取了 use%.(如果这行不通,请尝试使用 group regex 来实现相同的目的)

2.) Since the Filesysytem column may have space inside the string, I have extracted the use% using the second index from the last. (if this doesn't work please try group regex to achieve the same)

3.)未定义使用百分比在30到70之间的案例,因此在这种情况下输出消息包含未定义".

3.) The case for use percentage between 30 and 70 is not defined, hence the output message contains "Undefined" for such cases.

一个示例输入输出(使用 Array [String] )

A sample input output (using Array[String])

 scala> val input =
           |       """|Filesystem    512-blocks      Used Available Capacity iused      ifree %iused  Mounted on
           |          |/dev/disk1     234618880 154868528  79238352    67% 1784543 4293182736    0%   /
           |          |devfs                364       364         0   100%     630          0  100%   /dev
           |          |map -hosts             0         0         0   100%       0          0  100%   /net
           |          |map auto_home          0         0         0   100%       0          0  100%   /home""".stripMargin


 scala> val inputStr: Array[Array[String]] = input.split("\n").filter(!_.contains("Filesystem")).map(_.split("\\s+"))

 scala> val outputMessage = inputStr.map {
      |       row =>
      |         // Assuming the position is always second from last
      |         val elementPosition = row.length - 2 
      |         // if Use%>70 for any case> Message: Increase ROM size by 20%
      |         // if Use%<30% for any case> Message: Decrease ROM size by 25%
      |         val usePercent = row(elementPosition).replace("%", "").toInt
      |         usePercent match {
      |           case x if x > 70 => (usePercent, "Increase ROM size by 20%")
      |           case x if x < 30 => (usePercent, "Decrease ROM size by 25%")
      |           case _ => (usePercent, "Undefined")
      |         }
      |     }

 scala> outputMessage.foreach(println)
 (0,Decrease ROM size by 25%)
 (100,Increase ROM size by 20%)
 (100,Increase ROM size by 20%)
 (100,Increase ROM size by 20%)

此代码适用于 Array [String] ,请对其进行 ReceiverInputDStream [(String,String)] 测试.该代码必须类似于:

This code works for a Array[String] please test it for ReceiverInputDStream[(String, String)]. The code must be similar to:

 val filteredStream: DStream[Array[String]] = kafkaStream
       .filter(!_._2.contains("Filesystem"))  // eliminate header
       .map(_._2.split("\\s+"))  // split with space
 val outputDStream: DStream[String] = filteredStream.map {
       row =>
         val useIdx = row.length - 2
         // if Use%>70 for any case> Message: Increase ROM size by 20%
         // if Use%<30% for any case> Message: Decrease ROM size by 25%
         val usePercent = row(useIdx).replace("%", "").toInt
         usePercent match {
           case x if x > 70 => "Increase ROM size by 20%"
           case x if x < 30 => "Decrease ROM size by 25%"
           case _ => "Undefined"
         }
     }

希望这会有所帮助.

这篇关于Kafka上的Spark Streaming打印来自kafka的不同值的不同大小写案例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆