循环遍历数据框并同时更新查找表:spark scala [英] Loop through dataframe and update the lookup table simultaneously: spark scala

查看:30
本文介绍了循环遍历数据框并同时更新查找表:spark scala的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个如下所示的 DataFrame.

I have a DataFrame like the following.

+---+-------------+-----+
| id|AccountNumber|scale|
+---+-------------+-----+
|  1|      1500847|    6|
|  2|      1501199|    7|
|  3|      1119024|    3|
+---+-------------+-----+

我必须填充第二个 DataFrame,它最初是空的,如下所示.

I have to populate a second DataFrame, which would initially be empty, as follows.

id  AccountNumber   scale
1   1500847         6
2   1501199         6
3   1119024         3

输出说明

第一个 DataFrame 中的第一行的 scale 为 6.检查该值减去 1(所以 scale 等于 5)在结果.没有,所以只需将行 (1,1500847,6) 添加到输出中.

Output explaination

First row in the first DataFrame has a scale of 6. Check for that value minus 1 (so scale equals 5) in the result. There none, so simply add the row (1,1500847,6) to the output.

输出中的第二行的 scale 为 7.原始表已经有一行 scale 7 - 1,因此添加此行但具有该比例(2, 15001199, 6).

The second row in the output has a scale of 7. The original table already has a row with scale 7 - 1, so add this row but with that scale (2, 15001199, 6).

第三行作为第一行.

推荐答案

使用广播列表

您可以收集scale列中的所有scale作为Arraybroadcastcode> 在 udf 函数中使用.然后在when逻辑中使用udf函数,withColumn

You can collect all the scales in scale column as an Array and broadcast it to be used in udf function. Then use the udf function in when logic with withColumn as

import org.apache.spark.sql.functions._
val collectedList = sc.broadcast(df.select(collect_list("scale")).collect()(0)(0).asInstanceOf[collection.mutable.WrappedArray[Int]])

import org.apache.spark.sql.functions._
def newScale = udf((scale: Int)=> collectedList.value.contains(scale))

df.withColumn("scale", when(newScale(col("scale")-1), col("scale")-1).otherwise(col("scale")))
  .show(false)

你应该有想要的输出

+---+-------------+-----+
|id |AccountNumber|scale|
+---+-------------+-----+
|1  |1500847      |6    |
|2  |1501199      |6    |
|3  |1119024      |3    |
+---+-------------+-----+

使用窗口函数

我要建议的解决方案要求您使用 Window 函数收集 一个执行程序 中的所有数据以形成另一列 scaleCheck它将用 scale 列中存在的所有比例填充为

The solution I am going to suggest would require you to collect all the data in one executor using Window function to form another column scaleCheck which will be populated with all the scales present in scale column as

import org.apache.spark.sql.expressions.Window
def windowSpec = Window.orderBy("id").rowsBetween(Long.MinValue, Long.MaxValue)
val tempdf = df.withColumn("scaleCheck", collect_list("scale").over(windowSpec))

这会给你 dataframe

+---+-------------+-----+----------+
|id |AccountNumber|scale|scaleCheck|
+---+-------------+-----+----------+
|1  |1500847      |6    |[6, 7, 3] |
|2  |1501199      |7    |[6, 7, 3] |
|3  |1119024      |3    |[6, 7, 3] |
+---+-------------+-----+----------+

然后你必须写一个udf函数来检查行中的比例是否已经存在于收集的列表中.然后使用when函数并调用udf函数,就可以生成scale

Then you would have to write a udf function to check whether the scale in the row is already present in the collected list. Then using when function and calling the udf function, you can generate the scale value

import org.apache.spark.sql.functions._
def newScale = udf((scale: Int, scaleCheck: collection.mutable.WrappedArray[Int])=> scaleCheck.contains(scale))

tempdf.withColumn("scale", when(newScale(col("scale")-1, col("scaleCheck")), col("scale")-1).otherwise(col("scale")))
  .drop("scaleCheck")
  .show(false)

所以你最终需要的 dataframe 已经实现,上面给出了

So your final required dataframe is achieved which is given above

希望回答对你有帮助

这篇关于循环遍历数据框并同时更新查找表:spark scala的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆