如何在旧版Spark流中使用foreachRDD [英] How to use foreachRDD in legacy Spark Streaming

查看:107
本文介绍了如何在旧版Spark流中使用foreachRDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在使用foreachRDD进行CSV数据处理时出现异常.这是我的代码

I am getting exception while using foreachRDD for my CSV data processing. Here is my code

  case class Person(name: String, age: Long)
  val conf = new SparkConf()
  conf.setMaster("local[*]")
  conf.setAppName("CassandraExample").set("spark.driver.allowMultipleContexts", "true")
  val ssc = new StreamingContext(conf, Seconds(10))
  val smDstream=ssc.textFileStream("file:///home/sa/testFiles")

  smDstream.foreachRDD((rdd,time) => {
  val peopleDF = rdd.map(_.split(",")).map(attributes => 
  Person(attributes(0), attributes(1).trim.toInt)).toDF()
  peopleDF.createOrReplaceTempView("people")
  val teenagersDF = spark.sql("insert into table devDB.stam SELECT name, age 
  FROM people WHERE age BETWEEN 13 AND 29")
  //teenagersDF.show  
    })
  ssc.checkpoint("hdfs://go/hive/warehouse/devDB.db")
  ssc.start()

我遇到以下错误java.io.NotSerializableException:已启用DStream检查点,但DStream及其功能不可序列化org.apache.spark.streaming.StreamingContext序列化堆栈:-无法序列化的对象(类:org.apache.spark.streaming.StreamingContext,值:org.apache.spark.streaming.StreamingContext@1263422a)-字段(类:$ iw,名称:ssc,类型:类org.apache.spark.streaming.StreamingContext)

i am getting following error java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable org.apache.spark.streaming.StreamingContext Serialization stack: - object not serializable (class: org.apache.spark.streaming.StreamingContext, value: org.apache.spark.streaming.StreamingContext@1263422a) - field (class: $iw, name: ssc, type: class org.apache.spark.streaming.StreamingContext)

请帮助

推荐答案

这个问题不再有意义,因为 dStreams已被弃用/废弃.

The question does not really make sense anymore in that dStreams are being deprecated / abandoned.

在代码中需要考虑一些事情,因此很难找到确切的问题.就是说,我不仅要思考,而且我不是序列化专家.

There a few things to consider in the code, what the exact question is therefore hard to glean. That said, I had to ponder as well as I am not a Serialization expert.

您可以找到一些尝试直接写到Hive表而不是路径的文章,在我的回答中我使用了一种方法,但是您可以使用Spark SQL的方法为TempView编写,这是可能的

You can find a few posts of some trying to write to Hive table directly as opposed to a path, in my answer I use an approach but you can use your approach of Spark SQL to write for a TempView, that is all possible.

我模拟了QueueStream的输入,因此不需要拆分.如果您遵循相同的全局"方法,则可以根据自己的情况对此进行调整.我选择写入要在需要时创建的镶木地板文件.您可以创建您的tempView,然后按照初始方法使用spark.sql.

I simulated input from a QueueStream, so I need no split to be applied. You can adapt this to your own situation if you follow the same "global" approach. I elected to write to a parquet file that gets created if needed. You can create your tempView and then use spark.sql as per your initial approach.

DStream的输出操作是:

The Output Operations on DStreams are:

  • print()
  • saveAsTextFiles(前缀,[后缀])
  • saveAsObjectFiles(前缀,[后缀])
  • saveAsHadoopFiles(前缀,[后缀])
  • foreachRDD(func)

foreachRDD

最通用的输出运算符,将函数func应用于从流中生成的每个RDD.此功能应推送数据在每个RDD中将其保存到外部系统中,例如将RDD保存到文件中,或者通过网络将其写入数据库.注意函数func在运行流应用程序的驱动程序进程中执行,并且通常会包含RDD动作,这将迫使流式RDD的计算.

The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.

它说明了保存到文件,但是它可以通过foreachRDD完成您想要的操作,尽管我认为这个想法是针对外部系统的.保存到文件更快在我看来,而不是通过步骤来编写表格直接地.您希望通过Streaming尽快卸载数据,因为卷通常很高.

It states saving to files, but it can do what you want via foreachRDD, albeit I assumed the idea was to external systems. Saving to files is quicker in my view as opposed to going through steps to write a table directly. You want to offload data asap with Streaming as volumes are typically high.

两个步骤:

在与流式类不同的类中-在Spark 2.4下运行:

In a separate class to the Streaming Class - run under Spark 2.4:

case class Person(name: String, age: Int)

然后您需要应用流逻辑-您可能需要一些导入否则,我在DataBricks下运行该笔记本时会遇到该问题:

Then the Streaming logic you need to apply - you may need some imports that I have in my notebook otherwise as I ran this under DataBricks:

import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
import org.apache.spark.sql.SaveMode

val spark = SparkSession
           .builder
           .master("local[4]")
           .config("spark.driver.cores", 2)
           .appName("forEachRDD")
           .getOrCreate()

val sc = spark.sparkContext
val ssc = new StreamingContext(spark.sparkContext, Seconds(1)) 

val rddQueue = new mutable.Queue[RDD[List[(String, Int)]]]()
val QS = ssc.queueStream(rddQueue) 

QS.foreachRDD(q => {
   if(!q.isEmpty) {   
      val q_flatMap = q.flatMap{x=>x}
      val q_withPerson = q_flatMap.map(field => Person(field._1, field._2))
      val df = q_withPerson.toDF()      

      df.write
        .format("parquet")
        .mode(SaveMode.Append)
        .saveAsTable("SO_Quest_BigD")
   }
 }
)

ssc.start()
for (c <- List(List(("Fred",53), ("John",22), ("Mary",76)), List(("Bob",54), ("Johnny",92), ("Margaret",15)), List(("Alfred",21), ("Patsy",34), ("Sylvester",7)) )) {
   rddQueue += ssc.sparkContext.parallelize(List(c))
} 
ssc.awaitTermination()    

这篇关于如何在旧版Spark流中使用foreachRDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆