从Databricks的UDF内部查询Delta Lake [英] Querying Delta Lake from Inside of UDF in Databricks
问题描述
需要在结构化流传输中对UDF内部的表执行一些查询。问题是,如果我尝试使用 spark.sql ,则在UDF内会出现空指针异常。最好的遵循的方法是什么。
Need to perform some queries to a table inside of a UDF in structured streaming. The problem is, inside the UDF if i try to use spark.sql, i am getting null pointer exception. What is the best way to follow here.
基本上,我需要从一个表中流式传输,然后使用该数据从另一个表中执行某些范围查询。
Basically i need to stream from a table, and then use that data to perform some range queries from another table.
例如
val appleFilter = udf((appleId : String) => {
val query = "select count(*) from appleMart where appleId='"+appleId+"'"
val appleCount = spark.sql(query).collect().head.getLong(0)
(appleCount>0)
})
val newApple = apples.filter(appleFilter($"appleId"))
推荐答案
这并不是此任务的正确方法-您不应从UDF中进行单独的查询,因为Spark不会能够并行化/优化它们。
This is not really a correct approach for this task - you shouldn't do separate queries from inside UDF, as Spark won't be able to parallelize/optimize them.
更好的方法是执行连接在您的流数据帧之间& appleMart
数据框-这将允许Spark优化所有操作。据我从您的代码中了解,您只需要检查您是否有给定ID的苹果即可。在这种情况下,您可以进行内部联接-这将只保留 appleMart
中有行的ID,如下所示:
The better way will be just to do a join between your streaming dataframe & appleMart
dataframe - this will allow Spark to optimize all operations. As I understand from your code, you just need to check that you have apples with given ID. In this case, you can just do the inner join - this will leave only IDs for which there are rows in the appleMart
, something like this:
val appleMart = spark.read.format("delta").load("path_to_delta")
val newApple = apples.join(appleMart, apples("appleId") === appleMart("appleId"))
由于某些原因,您需要留下 appleMart
中不存在的 apples
条目,可以使用向左
加入...
if for some reason you need to leave apples
entries that doesn't exist in the appleMart
, you can use left
join instead...
PS如果 appleMart
不太经常更改,则可以对其进行缓存。尽管对于流作业,对于查找表,例如 Cassandra从性能的角度来看可能会更好。
P.S. If appleMart
doesn't change very often, you can cache it. Although, for streaming jobs, for lookup tables something like Cassandra could be better from performance standpoint.
这篇关于从Databricks的UDF内部查询Delta Lake的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!