基于spark数据帧scala中的列值过滤行 [英] Filtering rows based on column values in spark dataframe scala

查看:53
本文介绍了基于spark数据帧scala中的列值过滤行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个数据框(火花):

I have a dataframe(spark):

id  value 
3     0
3     1
3     0
4     1
4     0
4     0

我想创建一个新的数据框:

I want to create a new dataframe:

3 0
3 1
4 1

需要为每个 id 删除 1(value) 之后的所有行.我尝试在 spark dateframe(Scala) 中使用窗口函数.但是找不到解决办法,看来是我走错方向了.

Need to remove all the rows after 1(value) for each id.I tried with window functions in spark dateframe(Scala). But couldn't able to find a solution.Seems to be I am going in a wrong direction.

我正在 Scala 中寻找解决方案.谢谢

I am looking for a solution in Scala.Thanks

使用 monotonically_increasing_id 输出

Output using monotonically_increasing_id

 scala> val data = Seq((3,0),(3,1),(3,0),(4,1),(4,0),(4,0)).toDF("id", "value")
data: org.apache.spark.sql.DataFrame = [id: int, value: int]

scala> val minIdx = dataWithIndex.filter($"value" === 1).groupBy($"id").agg(min($"idx")).toDF("r_id", "min_idx")
minIdx: org.apache.spark.sql.DataFrame = [r_id: int, min_idx: bigint]

scala> dataWithIndex.join(minIdx,($"r_id" === $"id") && ($"idx" <= $"min_idx")).select($"id", $"value").show
+---+-----+
| id|value|
+---+-----+
|  3|    0|
|  3|    1|
|  4|    1|
+---+-----+

如果我们在原始数据框中进行排序转换,该解决方案将不起作用.那个时候 monotonically_increasing_id() 是基于原始 DF 而不是排序的 DF 生成的.我之前错过了那个要求.

The solution wont work if we did a sorted transformation in the original dataframe. That time the monotonically_increasing_id() is generated based on original DF rather that sorted DF.I have missed that requirement before.

欢迎所有建议.

推荐答案

一种方法是使用 monotonically_increasing_id() 和一个自连接:

One way is to use monotonically_increasing_id() and a self-join:

val data = Seq((3,0),(3,1),(3,0),(4,1),(4,0),(4,0)).toDF("id", "value")
data.show
+---+-----+
| id|value|
+---+-----+
|  3|    0|
|  3|    1|
|  3|    0|
|  4|    1|
|  4|    0|
|  4|    0|
+---+-----+

现在我们生成一个名为idx的列,其Long是递增的:

Now we generate a column named idx with an increasing Long:

val dataWithIndex = data.withColumn("idx", monotonically_increasing_id())
// dataWithIndex.cache()

现在我们得到每个idmin(idx),其中value = 1:

Now we get the min(idx) for each id where value = 1:

val minIdx = dataWithIndex
               .filter($"value" === 1)
               .groupBy($"id")
               .agg(min($"idx"))
               .toDF("r_id", "min_idx")

现在我们将min(idx)加入到原来的DataFrame中:

Now we join the min(idx) back to the original DataFrame:

dataWithIndex.join(
  minIdx,
  ($"r_id" === $"id") && ($"idx" <= $"min_idx")
).select($"id", $"value").show
+---+-----+
| id|value|
+---+-----+
|  3|    0|
|  3|    1|
|  4|    1|
+---+-----+

注意: monotonically_increasing_id() 根据行的分区生成其值.每次重新评估 dataWithIndex 时,此值可能会更改.在我上面的代码中,由于延迟评估,只有当我调用最终的 show 时,才会评估 monotonically_increasing_id().

Note: monotonically_increasing_id() generates its value based on the partition of the row. This value may change each time dataWithIndex is re-evaluated. In my code above, because of lazy evaluation, it's only when I call the final show that monotonically_increasing_id() is evaluated.

如果您想强制该值保持不变,例如,您可以使用 show 来逐步评估上述内容,请取消注释上面的这一行:

If you want to force the value to stay the same, for example so you can use show to evaluate the above step-by-step, uncomment this line above:

//  dataWithIndex.cache()

这篇关于基于spark数据帧scala中的列值过滤行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆