遍历数据帧并同时更新查找表:spark scala [英] Loop through dataframe and update the lookup table simultaneously: spark scala
问题描述
我有一个 DataFrame
,如下所示.
+---+-------------+-----+
| id|AccountNumber|scale|
+---+-------------+-----+
| 1| 1500847| 6|
| 2| 1501199| 7|
| 3| 1119024| 3|
+---+-------------+-----+
我必须填充第二个 DataFrame
,它最初是空的,如下所示.
I have to populate a second DataFrame
, which would initially be empty, as follows.
id AccountNumber scale
1 1500847 6
2 1501199 6
3 1119024 3
输出说明
第一个 DataFrame
中的第一行的 scale
为6.检查值是否减去1(因此 scale
等于5).结果.没有,因此只需将行(1,1500847,6)
添加到输出中即可.
Output explaination
First row in the first DataFrame
has a scale
of 6. Check for that value minus 1 (so scale
equals 5) in the result. There none, so simply add the row (1,1500847,6)
to the output.
输出中的第二行的 scale
为7.原始表已经具有 scale
7-1的行,因此请添加该行,但使用该比例(2,15001199,6)
.
The second row in the output has a scale
of 7. The original table already has a row with scale
7 - 1, so add this row but with that scale (2, 15001199, 6)
.
第三行作为第一行.
推荐答案
使用广播列表
您可以在 scale
列中收集所有 scales 作为 Array 和 broadcast
在 udf
函数中使用.然后在 when
逻辑中将 udf
函数用于 withColumn
为
You can collect all the scales in scale
column as an Array and broadcast
it to be used in udf
function. Then use the udf
function in when
logic with withColumn
as
import org.apache.spark.sql.functions._
val collectedList = sc.broadcast(df.select(collect_list("scale")).collect()(0)(0).asInstanceOf[collection.mutable.WrappedArray[Int]])
import org.apache.spark.sql.functions._
def newScale = udf((scale: Int)=> collectedList.value.contains(scale))
df.withColumn("scale", when(newScale(col("scale")-1), col("scale")-1).otherwise(col("scale")))
.show(false)
您应该具有所需的输出
+---+-------------+-----+
|id |AccountNumber|scale|
+---+-------------+-----+
|1 |1500847 |6 |
|2 |1501199 |6 |
|3 |1119024 |3 |
+---+-------------+-----+
使用窗口功能
我建议的解决方案将要求您使用 Window
函数在一个执行器中收集所有数据,以形成另一列 scaleCheck
它将使用 scale
列中显示为
The solution I am going to suggest would require you to collect all the data in one executor using Window
function to form another column scaleCheck
which will be populated with all the scales present in scale
column as
import org.apache.spark.sql.expressions.Window
def windowSpec = Window.orderBy("id").rowsBetween(Long.MinValue, Long.MaxValue)
val tempdf = df.withColumn("scaleCheck", collect_list("scale").over(windowSpec))
这将为您提供 dataframe
+---+-------------+-----+----------+
|id |AccountNumber|scale|scaleCheck|
+---+-------------+-----+----------+
|1 |1500847 |6 |[6, 7, 3] |
|2 |1501199 |7 |[6, 7, 3] |
|3 |1119024 |3 |[6, 7, 3] |
+---+-------------+-----+----------+
然后,您将必须编写一个 udf
函数,以检查收集列表中行中的小数位数.然后使用 when
函数并调用 udf
函数,您可以生成 scale
值
Then you would have to write a udf
function to check whether the scale in the row is already present in the collected list. Then using when
function and calling the udf
function, you can generate the scale
value
import org.apache.spark.sql.functions._
def newScale = udf((scale: Int, scaleCheck: collection.mutable.WrappedArray[Int])=> scaleCheck.contains(scale))
tempdf.withColumn("scale", when(newScale(col("scale")-1, col("scaleCheck")), col("scale")-1).otherwise(col("scale")))
.drop("scaleCheck")
.show(false)
因此,您已经获得了上面给出的最终所需的 dataframe
So your final required dataframe
is achieved which is given above
我希望答案会有所帮助
这篇关于遍历数据帧并同时更新查找表:spark scala的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!