星火流:加入DSTREAM分批进入单一的输出文件夹 [英] Spark Streaming : Join Dstream batches into single output Folder
问题描述
我使用的Spark流通过创建的StreamingContext作为获取来自Twitter的鸣叫:
VAL SSC =新的StreamingContext(本地[3],通过twitterfeed,分(1) )
I am using Spark Streaming to fetch tweets from twitter by creating a StreamingContext as :
val ssc = new StreamingContext("local[3]", "TwitterFeed",Minutes(1))
和如创建Twitter的数据流: VAL tweetStream = TwitterUtils.createStream(SSC,有的(新OAuthAuthorization(Util.config)),过滤器)
and creating twitter stream as :
val tweetStream = TwitterUtils.createStream(ssc, Some(new OAuthAuthorization(Util.config)),filters)
然后将其保存为文本文件 tweets.repartition(1).saveAsTextFiles(/ tmp目录/ spark_testing /)
then saving it as text file
tweets.repartition(1).saveAsTextFiles("/tmp/spark_testing/")
和问题是,在微博被保存为基于批处理时间的文件夹,但我需要在同一文件夹中的每个批次的所有数据。
and the problem is that the tweets are being saved as folders based on batch time but I need all the data of each batch in a same folder.
是否有任何解决方法吗?
Is there any workaround for it?
感谢
推荐答案
我们可以做到这一点使用SQL星火新的数据帧保存的API,它允许追加到现有输出。默认情况下,saveAsTextFile,将无法保存对现有数据的目录(见的 https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes )。 <一href=\"https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations\" rel=\"nofollow\">https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations涵盖了如何设置与星火流使用SQL星火环境。
We can do this using Spark SQL's new DataFrame saving API which allow appending to an existing output. By default, saveAsTextFile, won't be able to save to a directory with existing data (see https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes ). https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations covers how to setup a Spark SQL context for use with Spark Streaming.
假设你从与SQLContextSingleton,由此产生的code看起来像引导复制的部分:
Assuming you copy the part from the guide with the SQLContextSingleton, The resulting code would look something like:
data.foreachRDD{rdd =>
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
// Convert your data to a DataFrame, depends on the structure of your data
val df = ....
df.save("org.apache.spark.sql.json", SaveMode.Append, Map("path" -> path.toString))
}
(注意使用JSON保存结果上面的例子,但可以使用不同的输出格式太)。
(Note the above example used JSON to save the result, but you can use different output formats too).
这篇关于星火流:加入DSTREAM分批进入单一的输出文件夹的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!