Kafka 上的 Spark Streaming 为来自 kafka 的不同值打印不同的案例 [英] Spark Streaming on Kafka print different cases for different values from kafka
问题描述
我在下面说明我的情况:10000 - 服务器正在发送 DF 大小数据.(每 5 秒就有 10,000 个输入到来)
I am stating my scenario below: 10000 - Servers are sending DF size data. ( Every 5 seconds 10,000 inputs are coming )
如果对于任何服务器 DF 大小超过 70% 打印,将 ROM 大小增加 20%如果任何服务器使用的 DF 大小小于 30%,则将 ROM 大小减少 25%.
If for any server DF size is more than 70 % print increase the ROM size by 20 % If for any server DF size used is less than 30 % print decrease the ROM size by 25 %.
我提供了一个从 kafka 获取消息并与%"匹配并执行 to.upper() 的代码.此代码仅用于参考我的 kafka 详细信息.
I am providing a code that takes messages from kafka and matches with "%" and does to.upper(). This code is just for a reference to my kafka details.
谁能帮我解决这个场景.
Can anyone please help me with the scenario.
package rnd
import org.apache.spark.SparkConf
import kafka.serializer.StringDecoder
import org.apache.spark.sql.SQLContext
//import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
//import org.apache.spark.util.TimeStampedWeakValueHashMap.toWeakReference
import org.apache.spark.{SparkConf, SparkContext}
object WordFind {
def main(args: Array[String]) {
import org.apache.spark.SparkConf
val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
val batchIntervalSeconds = 2
val ssc = new StreamingContext(conf, Seconds(10))
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.dstream.ReceiverInputDStream
val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, "localhost:2181","spark-streaming-consumer-group", Map("wordcounttopic" -> 5))
import org.apache.spark.streaming.dstream.DStream
val filteredStream: DStream[(String, String)] = kafkaStream.filter(record =>
record._2.contains("%")) // TODO : pattern matching here
val outputDStream: DStream[String] = filteredStream.map(record => record._2.toUpperCase())
outputDStream.print()
ssc.start
ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 5 * 1000)
}
}
请帮我写出满足代码的场景.
Please help me with the scenario satisfying code.
样本输入
文件系统 1K 块已使用 可用使用% 已安装/dev/sda1 132239776 6210884 119311504 5%/tmpfs 4021876 0 4021876 0%/dev/shm
Filesystem 1K-blocks Used Available Use% Mounted on /dev/sda1 132239776 6210884 119311504 5% / tmpfs 4021876 0 4021876 0% /dev/shm
示例输出:如果在任何情况下使用%>70> 消息:将 ROM 大小增加 20%if Use%<30% for any case> 消息:将 ROM 大小减少 25%
Sample output: if Use%>70 for any case> Message: Increase ROM size by 20% if Use%<30% for any case> Message: Decrease ROM size by 25%
即使我必须把它放到 Elastic 搜索中,但它给出了错误:
even i have to put that to Elastic search and it is giving error:
package rnd
import org.apache.spark.SparkConf
import kafka.serializer.StringDecoder
import org.apache.spark.sql.SQLContext
//import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
//import org.apache.spark.util.TimeStampedWeakValueHashMap.toWeakReference
import org.apache.spark.{SparkConf, SparkContext}
object WordFind {
def main(args: Array[String]) {
}
import org.apache.spark.SparkConf
val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
val sc = new SparkContext(conf)
val checkpointDir = "/usr/local/kafka/kafka_2.11-0.11.0.2/checkpoint/"
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
val batchIntervalSeconds = 2
val ssc = new StreamingContext(conf, Seconds(10))
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.dstream.ReceiverInputDStream
val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))
val kafkaStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, "localhost:2181",
"spark-streaming-consumer-group", Map("wordcounttopic" -> 5))
import org.apache.spark.streaming.dstream.DStream
val filteredStream: DStream[Array[String]] = kafkaStream
.filter(!_._2.contains("Filesystem")) // eliminate header
.map(_._2.split("\\s+")) // split with space
val outputDStream: DStream[String] = filteredStream.map {
row =>
val useIdx = row.length - 2
// if Use%>70 for any case> Message: Increase ROM size by 20%
// if Use%<30% for any case> Message: Decrease ROM size by 25%
val usePercent = row(useIdx).replace("%", "").toInt
usePercent match {
case x if x > 70 => "Increase ROM size by 20%"
case x if x < 30 => "Decrease ROM size by 25%"
case _ => "Undefined"
}
outputDStream.print()
import org.elasticsearch.spark.sql._
outputDStream.saveToEs("dfvalueoperations_v1/kwc")
}
// To make sure data is not deleted by the time we query it interactively
ssc.remember(Minutes(1))
ssc.checkpoint(checkpointDir)
ssc
// }
// This starts the streaming context in the background.
ssc.start()
// This is to ensure that we wait for some time before the background streaming job starts. This will put this cell on hold for 5 times the batchIntervalSeconds.
ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 5 * 1000)
}
error: Error:(51, 21) value saveToEs 不是 org.apache.spark.streaming.dstream.DStream[String] 的成员outputDStream.saveToEs("kafkamessage_v1/kwc")
error: Error:(51, 21) value saveToEs is not a member of org.apache.spark.streaming.dstream.DStream[String] outputDStream.saveToEs("kafkamessage_v1/kwc")
推荐答案
为获得所需的输出做了一些假设.
Have made few assumptions to get the required output.
1.) 标题可能出现在两者之间,因此使用过滤器来删除标题.
1.) Headers may appear in between hence a filter is used to remove the header.
Filesystem 1K-blocks Used Available Use% Mounted on
2.) 由于 Filesysytem
列在字符串内可能有空格,我使用最后一个索引提取了 use%
.(如果这不起作用,请尝试 group regex
以达到相同的效果)
2.) Since the Filesysytem
column may have space inside the string, I have extracted the use%
using the second index from the last. (if this doesn't work please try group regex
to achieve the same)
3.) 未定义使用百分比在 30 到 70 之间的情况,因此对于此类情况,输出消息包含未定义".
3.) The case for use percentage between 30 and 70 is not defined, hence the output message contains "Undefined" for such cases.
一个示例输入输出(使用Array[String]
)
A sample input output (using Array[String]
)
scala> val input =
| """|Filesystem 512-blocks Used Available Capacity iused ifree %iused Mounted on
| |/dev/disk1 234618880 154868528 79238352 67% 1784543 4293182736 0% /
| |devfs 364 364 0 100% 630 0 100% /dev
| |map -hosts 0 0 0 100% 0 0 100% /net
| |map auto_home 0 0 0 100% 0 0 100% /home""".stripMargin
scala> val inputStr: Array[Array[String]] = input.split("\n").filter(!_.contains("Filesystem")).map(_.split("\\s+"))
scala> val outputMessage = inputStr.map {
| row =>
| // Assuming the position is always second from last
| val elementPosition = row.length - 2
| // if Use%>70 for any case> Message: Increase ROM size by 20%
| // if Use%<30% for any case> Message: Decrease ROM size by 25%
| val usePercent = row(elementPosition).replace("%", "").toInt
| usePercent match {
| case x if x > 70 => (usePercent, "Increase ROM size by 20%")
| case x if x < 30 => (usePercent, "Decrease ROM size by 25%")
| case _ => (usePercent, "Undefined")
| }
| }
scala> outputMessage.foreach(println)
(0,Decrease ROM size by 25%)
(100,Increase ROM size by 20%)
(100,Increase ROM size by 20%)
(100,Increase ROM size by 20%)
此代码适用于 Array[String]
请针对 ReceiverInputDStream[(String, String)]
对其进行测试.代码必须类似于:
This code works for a Array[String]
please test it for ReceiverInputDStream[(String, String)]
. The code must be similar to:
val filteredStream: DStream[Array[String]] = kafkaStream
.filter(!_._2.contains("Filesystem")) // eliminate header
.map(_._2.split("\\s+")) // split with space
val outputDStream: DStream[String] = filteredStream.map {
row =>
val useIdx = row.length - 2
// if Use%>70 for any case> Message: Increase ROM size by 20%
// if Use%<30% for any case> Message: Decrease ROM size by 25%
val usePercent = row(useIdx).replace("%", "").toInt
usePercent match {
case x if x > 70 => "Increase ROM size by 20%"
case x if x < 30 => "Decrease ROM size by 25%"
case _ => "Undefined"
}
}
希望这会有所帮助.
这篇关于Kafka 上的 Spark Streaming 为来自 kafka 的不同值打印不同的案例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!