遍历数据帧并同时更新查找表:spark scala [英] Loop through dataframe and update the lookup table simultaneously: spark scala

查看:130
本文介绍了遍历数据帧并同时更新查找表:spark scala的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 DataFrame ,如下所示.

+---+-------------+-----+
| id|AccountNumber|scale|
+---+-------------+-----+
|  1|      1500847|    6|
|  2|      1501199|    7|
|  3|      1119024|    3|
+---+-------------+-----+

我必须填充第二个 DataFrame ,它最初是空的,如下所示.

I have to populate a second DataFrame, which would initially be empty, as follows.

id  AccountNumber   scale
1   1500847         6
2   1501199         6
3   1119024         3

输出说明

第一个 DataFrame 中的第一行的 scale 为6.检查值是否减去1(因此 scale 等于5).结果.没有,因此只需将行(1,1500847,6)添加到输出中即可.

Output explaination

First row in the first DataFrame has a scale of 6. Check for that value minus 1 (so scale equals 5) in the result. There none, so simply add the row (1,1500847,6) to the output.

输出中的第二行的 scale 为7.原始表已经具有 scale 7-1的行,因此请添加该行,但使用该比例(2,15001199,6).

The second row in the output has a scale of 7. The original table already has a row with scale 7 - 1, so add this row but with that scale (2, 15001199, 6).

第三行作为第一行.

推荐答案

使用广播列表

您可以在 scale 列中收集所有 scales 作为 Array broadcast udf 函数中使用.然后在 when 逻辑中将 udf 函数用于 withColumn

You can collect all the scales in scale column as an Array and broadcast it to be used in udf function. Then use the udf function in when logic with withColumn as

import org.apache.spark.sql.functions._
val collectedList = sc.broadcast(df.select(collect_list("scale")).collect()(0)(0).asInstanceOf[collection.mutable.WrappedArray[Int]])

import org.apache.spark.sql.functions._
def newScale = udf((scale: Int)=> collectedList.value.contains(scale))

df.withColumn("scale", when(newScale(col("scale")-1), col("scale")-1).otherwise(col("scale")))
  .show(false)

您应该具有所需的输出

+---+-------------+-----+
|id |AccountNumber|scale|
+---+-------------+-----+
|1  |1500847      |6    |
|2  |1501199      |6    |
|3  |1119024      |3    |
+---+-------------+-----+

使用窗口功能

我建议的解决方案将要求您使用 Window 函数在一个执行器中收集所有数据,以形成另一列 scaleCheck 它将使用 scale 列中显示为

The solution I am going to suggest would require you to collect all the data in one executor using Window function to form another column scaleCheck which will be populated with all the scales present in scale column as

import org.apache.spark.sql.expressions.Window
def windowSpec = Window.orderBy("id").rowsBetween(Long.MinValue, Long.MaxValue)
val tempdf = df.withColumn("scaleCheck", collect_list("scale").over(windowSpec))

这将为您提供 dataframe

+---+-------------+-----+----------+
|id |AccountNumber|scale|scaleCheck|
+---+-------------+-----+----------+
|1  |1500847      |6    |[6, 7, 3] |
|2  |1501199      |7    |[6, 7, 3] |
|3  |1119024      |3    |[6, 7, 3] |
+---+-------------+-----+----------+

然后,您将必须编写一个 udf 函数,以检查收集列表中行中的小数位数.然后使用 when 函数并调用 udf 函数,您可以生成 scale

Then you would have to write a udf function to check whether the scale in the row is already present in the collected list. Then using when function and calling the udf function, you can generate the scale value

import org.apache.spark.sql.functions._
def newScale = udf((scale: Int, scaleCheck: collection.mutable.WrappedArray[Int])=> scaleCheck.contains(scale))

tempdf.withColumn("scale", when(newScale(col("scale")-1, col("scaleCheck")), col("scale")-1).otherwise(col("scale")))
  .drop("scaleCheck")
  .show(false)

因此,您已经获得了上面给出的最终所需的 dataframe

So your final required dataframe is achieved which is given above

我希望答案会有所帮助

这篇关于遍历数据帧并同时更新查找表:spark scala的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆