如何在流查询中执行动态SQL? [英] How to execute dynamic SQLs in streaming queries?
问题描述
我正在使用Spark结构化流,并处理来自Kafka的消息.有时,我的结果表如下所示,其中数据集中的每一行都有一个Spark SQL查询.
I'm using Spark Structured streaming, and processing messages from Kafka. At one point my result table looks something like below, where each line in the dataset has a Spark SQL query.
+----+--------------------+
|code| triggerSql|
+----+--------------------+
| US|SELECT * FROM def...|
| UK|SELECT * FROM def...|
+----+--------------------+
我需要执行每个查询并处理结果.但是,结构化流不允许将这些SQL收集到驱动程序端,并且我们不能在任何转换内打开新的SparkSession.
I need to execute each of these queries and process the results. However, structured streaming won't allow to collect these SQLs to driver side, and We can't open a new SparkSession inside any transformation.
val query = df3.writeStream.foreach(new ForeachWriter[Row] {
override def open(partitionId: Long, epochId: Long): Boolean = {
//..
}
override def process(value: Row): Unit = {
val triggerSqlString = value.getAs[String]("triggerSql")
val code = value.getAs[String]("value")
println("Code="+code+"; TriggerSQL="+triggerSqlString)
//TODO
}
override def close(errorOrNull: Throwable): Unit = {
// println("===> Closing..")
}
}).trigger(Trigger.ProcessingTime("5 seconds"))
.start()
有没有更好的替代方法可以在spark中动态执行这些SQL.
Is there any better alternative way to dynamically execute these SQL in spark.
推荐答案
tl; dr 使用以下示例显示了如何从批处理数据集中执行SQL查询:
The following sample shows how one could achieve execution of SQL queries from a batch dataset:
def sqlExecution(ds: Dataset[String], batchId: Long): Unit = {
ds.as[String].collect.foreach { s => sql(s).show }
}
spark
.readStream
.textFile("sqls")
.writeStream
.foreachBatch(sqlExecution)
.start
这篇关于如何在流查询中执行动态SQL?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!