Spark Streaming:将 Dstream 批次加入单个输出文件夹 [英] Spark Streaming : Join Dstream batches into single output Folder
问题描述
我正在使用 Spark Streaming 通过创建 StreamingContext 从 Twitter 获取推文:
val ssc = new StreamingContext("local[3]", "TwitterFeed",Minutes(1))代码>
I am using Spark Streaming to fetch tweets from twitter by creating a StreamingContext as :
val ssc = new StreamingContext("local[3]", "TwitterFeed",Minutes(1))
并将推特流创建为:val tweetStream = TwitterUtils.createStream(ssc, Some(new OAuthAuthorization(Util.config)),filters)
然后将其保存为文本文件tweets.repartition(1).saveAsTextFiles("/tmp/spark_testing/")
then saving it as text file
tweets.repartition(1).saveAsTextFiles("/tmp/spark_testing/")
问题是推文是根据批处理时间保存为文件夹的,但我需要将每个批处理的所有数据都放在同一个文件夹中.
and the problem is that the tweets are being saved as folders based on batch time but I need all the data of each batch in a same folder.
有什么解决办法吗?
谢谢
推荐答案
我们可以使用 Spark SQL 的新 DataFrame 保存 API 来完成此操作,该 API 允许附加到现有输出.默认情况下,saveAsTextFile 将无法保存到包含现有数据的目录(请参阅 https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes).https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations 介绍了如何设置用于 Spark Streaming 的 Spark SQL 上下文.
We can do this using Spark SQL's new DataFrame saving API which allow appending to an existing output. By default, saveAsTextFile, won't be able to save to a directory with existing data (see https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes ). https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations covers how to setup a Spark SQL context for use with Spark Streaming.
假设您使用 SQLContextSingleton 从指南中复制部分,生成的代码将类似于:
Assuming you copy the part from the guide with the SQLContextSingleton, The resulting code would look something like:
data.foreachRDD{rdd =>
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
// Convert your data to a DataFrame, depends on the structure of your data
val df = ....
df.save("org.apache.spark.sql.json", SaveMode.Append, Map("path" -> path.toString))
}
(注意上面的例子使用 JSON 来保存结果,但你也可以使用不同的输出格式).
(Note the above example used JSON to save the result, but you can use different output formats too).
这篇关于Spark Streaming:将 Dstream 批次加入单个输出文件夹的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!