从Databricks的UDF内部查询Delta Lake [英] Querying Delta Lake from Inside of UDF in Databricks

查看:120
本文介绍了从Databricks的UDF内部查询Delta Lake的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

需要在结构化流传输中对UDF内部的表执行一些查询。问题是,如果我尝试使用 spark.sql ,则在UDF内会出现空指针异常。最好的遵循的方法是什么。

Need to perform some queries to a table inside of a UDF in structured streaming. The problem is, inside the UDF if i try to use spark.sql, i am getting null pointer exception. What is the best way to follow here.

基本上,我需要从一个表中流式传输,然后使用该数据从另一个表中执行某些范围查询。

Basically i need to stream from a table, and then use that data to perform some range queries from another table.

例如

val appleFilter = udf((appleId : String) => {
     val query = "select count(*) from appleMart where appleId='"+appleId+"'"
     val appleCount = spark.sql(query).collect().head.getLong(0)
     (appleCount>0)
})

val newApple = apples.filter(appleFilter($"appleId"))


推荐答案

这并不是此任务的正确方法-您不应从UDF中进行单独的查询,因为Spark不会能够并行化/优化它们。

This is not really a correct approach for this task - you shouldn't do separate queries from inside UDF, as Spark won't be able to parallelize/optimize them.

更好的方法是执行连接在您的流数据帧之间& appleMart 数据框-这将允许Spark优化所有操作。据我从您的代码中了解,您只需要检查您是否有给定ID的苹果即可。在这种情况下,您可以进行内部联接-这将只保留 appleMart 中有行的ID,如下所示:

The better way will be just to do a join between your streaming dataframe & appleMart dataframe - this will allow Spark to optimize all operations. As I understand from your code, you just need to check that you have apples with given ID. In this case, you can just do the inner join - this will leave only IDs for which there are rows in the appleMart, something like this:

val appleMart = spark.read.format("delta").load("path_to_delta")
val newApple = apples.join(appleMart, apples("appleId") === appleMart("appleId"))

由于某些原因,您需要留下 appleMart 中不存在的 apples 条目,可以使用向左加入...

if for some reason you need to leave apples entries that doesn't exist in the appleMart, you can use left join instead...

PS如果 appleMart 不太经常更改,则可以对其进行缓存。尽管对于流作业,对于查找表,例如 Cassandra从性能的角度来看可能会更好

P.S. If appleMart doesn't change very often, you can cache it. Although, for streaming jobs, for lookup tables something like Cassandra could be better from performance standpoint.

这篇关于从Databricks的UDF内部查询Delta Lake的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆