循环遍历数据框并同时更新查找表:spark scala [英] Loop through dataframe and update the lookup table simultaneously: spark scala
问题描述
我有一个如下所示的 DataFrame
.
I have a DataFrame
like the following.
+---+-------------+-----+
| id|AccountNumber|scale|
+---+-------------+-----+
| 1| 1500847| 6|
| 2| 1501199| 7|
| 3| 1119024| 3|
+---+-------------+-----+
我必须填充第二个 DataFrame
,它最初是空的,如下所示.
I have to populate a second DataFrame
, which would initially be empty, as follows.
id AccountNumber scale
1 1500847 6
2 1501199 6
3 1119024 3
输出说明
第一个 DataFrame
中的第一行的 scale
为 6.检查该值减去 1(所以 scale
等于 5)在结果.没有,所以只需将行 (1,1500847,6)
添加到输出中.
Output explaination
First row in the first DataFrame
has a scale
of 6. Check for that value minus 1 (so scale
equals 5) in the result. There none, so simply add the row (1,1500847,6)
to the output.
输出中的第二行的 scale
为 7.原始表已经有一行 scale
7 - 1,因此添加此行但具有该比例(2, 15001199, 6)
.
The second row in the output has a scale
of 7. The original table already has a row with scale
7 - 1, so add this row but with that scale (2, 15001199, 6)
.
第三行作为第一行.
推荐答案
使用广播列表
您可以收集scale
列中的所有scale作为Array和broadcastcode> 在
udf
函数中使用.然后在when
逻辑中使用udf
函数,withColumn
为
You can collect all the scales in scale
column as an Array and broadcast
it to be used in udf
function. Then use the udf
function in when
logic with withColumn
as
import org.apache.spark.sql.functions._
val collectedList = sc.broadcast(df.select(collect_list("scale")).collect()(0)(0).asInstanceOf[collection.mutable.WrappedArray[Int]])
import org.apache.spark.sql.functions._
def newScale = udf((scale: Int)=> collectedList.value.contains(scale))
df.withColumn("scale", when(newScale(col("scale")-1), col("scale")-1).otherwise(col("scale")))
.show(false)
你应该有想要的输出
+---+-------------+-----+
|id |AccountNumber|scale|
+---+-------------+-----+
|1 |1500847 |6 |
|2 |1501199 |6 |
|3 |1119024 |3 |
+---+-------------+-----+
使用窗口函数
我要建议的解决方案要求您使用 Window
函数收集 一个执行程序 中的所有数据以形成另一列 scaleCheck
它将用 scale
列中存在的所有比例填充为
The solution I am going to suggest would require you to collect all the data in one executor using Window
function to form another column scaleCheck
which will be populated with all the scales present in scale
column as
import org.apache.spark.sql.expressions.Window
def windowSpec = Window.orderBy("id").rowsBetween(Long.MinValue, Long.MaxValue)
val tempdf = df.withColumn("scaleCheck", collect_list("scale").over(windowSpec))
这会给你 dataframe
+---+-------------+-----+----------+
|id |AccountNumber|scale|scaleCheck|
+---+-------------+-----+----------+
|1 |1500847 |6 |[6, 7, 3] |
|2 |1501199 |7 |[6, 7, 3] |
|3 |1119024 |3 |[6, 7, 3] |
+---+-------------+-----+----------+
然后你必须写一个udf
函数来检查行中的比例是否已经存在于收集的列表中.然后使用when
函数并调用udf
函数,就可以生成scale
值
Then you would have to write a udf
function to check whether the scale in the row is already present in the collected list. Then using when
function and calling the udf
function, you can generate the scale
value
import org.apache.spark.sql.functions._
def newScale = udf((scale: Int, scaleCheck: collection.mutable.WrappedArray[Int])=> scaleCheck.contains(scale))
tempdf.withColumn("scale", when(newScale(col("scale")-1, col("scaleCheck")), col("scale")-1).otherwise(col("scale")))
.drop("scaleCheck")
.show(false)
所以你最终需要的 dataframe
已经实现,上面给出了
So your final required dataframe
is achieved which is given above
希望回答对你有帮助
这篇关于循环遍历数据框并同时更新查找表:spark scala的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!