如何在Spark Streaming中(不仅仅是Spark)将RDD转换为DataFrame [英] How to convert RDD to DataFrame in Spark Streaming, not just Spark
问题描述
如何在Spark Streaming
中将RDD
转换为DataFrame
,而不仅仅是Spark
?
How can I convert RDD
to DataFrame
in Spark Streaming
, not just Spark
?
我看到了这个示例,但是它需要SparkContext
.
I saw this example, but it requires SparkContext
.
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
rdd.toDF()
就我而言,我有StreamingContext
.然后应该在foreach
中创建SparkContext
吗?看起来太疯狂了...那么,该如何处理呢?我的最终目标(如果可能有用)是使用rdd.toDF.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json");
将DataFrame
保存在Amazon S3中,对于RDD
,如果不将其转换为DataFrame
(我知道),这是不可能的.
In my case I have StreamingContext
. Should I then create SparkContext
inside foreach
? It looks too crazy... So, how to deal with this issue? My final goal (if it might be useful) is to save the DataFrame
in Amazon S3 using rdd.toDF.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json");
, which is not possible for RDD
without converting it to DataFrame
(as I know).
myDstream.foreachRDD { rdd =>
val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
rdd.toDF()
}
推荐答案
在foreachRDD
之外创建sqlContext
,一旦使用sqlContext
将rdd
转换为DF,就可以写入S3.
Create sqlContext
outside foreachRDD
,Once you convert the rdd
to DF using sqlContext
, you can write into S3.
例如:
val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
myDstream.foreachRDD { rdd =>
val df = rdd.toDF()
df.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json")
}
更新:
即使您可以在foreachRDD
中创建要在驱动程序上执行的sqlContext
.
Even you can create sqlContext
inside foreachRDD
which is going to execute on Driver.
这篇关于如何在Spark Streaming中(不仅仅是Spark)将RDD转换为DataFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!