在Databricks中为每个元素调用一个函数作为流 [英] Call a function with each element a stream in Databricks
问题描述
我在Databricks中有一个DataFrame流,并且我想对每个元素执行一个操作。在网上,我发现了特定目的的方法,例如将其写入控制台或转储到内存中,但是我想添加一些业务逻辑,并将一些结果放入Redis中。
I have a DataFrame stream in Databricks, and I want to perform an action on each element. On the net I found specific purpose methods, like writing it to the console or dumping into memory, but I want to add some business logic, and put some results into Redis.
更具体地说,这是非流情况下的样子:
To be more specific, this is how it would look like in non-stream case:
val someDataFrame = Seq(
("key1", "value1"),
("key2", "value2"),
("key3", "value3"),
("key4", "value4")
).toDF()
def someFunction(keyValuePair: (String, String)) = {
println(keyValuePair)
}
someDataFrame.collect.foreach(r => someFunction((r(0).toString, r(1).toString)))
但是,如果 someDataFrame
不是简单的数据帧而是流数据帧(确实来自Kafka),则会出现错误消息是这样的:
But if the someDataFrame
is not a simple data frame but a stream data frame (indeed coming from Kafka), the error message is this:
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
有人可以帮我解决这个问题吗?
Could anyone please help me solving this problem?
一些重要说明:
-
我已经阅读了相关文档,例如Spark Streaming或Databricks Streaming以及其他一些文档
I've read the relevant documentation, like Spark Streaming or Databricks Streaming and a few other descriptions as well.
我知道必须有类似 start()
和 awaitTermination
,但我不知道确切的语法。这些描述无济于事。
I know that there must be something like start()
and awaitTermination
, but I don't know the exact syntax. The descriptions did not help.
需要花一些页面列出我尝试过的所有可能性,所以我宁愿不提供它们。
It would take pages to list all the possibilities I tried, so I rather not provide them.
我不是想要解决显示结果的特定问题。即请不要针对这种情况提供解决方案。 someFunction
看起来像这样:
I do not want to solve the specific problem of displaying the result. I.e. please do not provide a solution to this specific case. The someFunction
would look like this:
val someData = readSomeExternalData()
if (condition containing keyValuePair and someData) {
doSomething(keyValuePair);
}
(问题 Spark结构化流中ForeachWriter的目的是什么?没有提供有效的示例,因此无法回答我的问题。)
(Question What is the purpose of ForeachWriter in Spark Structured Streaming? does not provide a working example, therefore does not answer my question.)
推荐答案
以下是使用foreachBatch进行阅读以使用流式API将每个项目保存到redis的示例。
Here is an example of reading using foreachBatch to save every item to redis using the streaming api.
与上一个问题(将DataFrame转换为RDD [(String,String)] )
// import spark and spark-redis
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.streaming._
import org.apache.spark.sql.types._
import com.redislabs.provider.redis._
// schema of csv files
val userSchema = new StructType()
.add("name", "string")
.add("age", "string")
// create a data stream reader from a dir with csv files
val csvDF = spark
.readStream
.format("csv")
.option("sep", ";")
.schema(userSchema)
.load("./data") // directory where the CSV files are
// redis
val redisConfig = new RedisConfig(new RedisEndpoint("localhost", 6379))
implicit val readWriteConfig: ReadWriteConfig = ReadWriteConfig.Default
csvDF.map(r => (r.getString(0), r.getString(0))) // converts the dataset to a Dataset[(String, String)]
.writeStream // create a data stream writer
.foreachBatch((df, _) => sc.toRedisKV(df.rdd)(redisConfig)) // save each batch to redis after converting it to a RDD
.start // start processing
这篇关于在Databricks中为每个元素调用一个函数作为流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!