根据数据框火花斯卡拉列值过滤行 [英] Filtering rows based on column values in spark dataframe scala
问题描述
我有一个数据框(火花)像下面
I have a dataframe(spark) like below
id value
3 0
3 1
3 0
4 1
4 0
4 0
我想创建一个像下面的
I want create a new data frame like below
3 0
3 1
4 1
需要经过1(值)删除所有行每个id.I试图与火花dateframe窗口函数(斯卡拉)。但不能能够找到一个solution.Seems是我在一个错误的方向前进。
Need to remove all the rows after 1(value) for each id.I tried with window functions in spark dateframe(Scala). But couldn't able to find a solution.Seems to be I am going in a wrong direction.
我要寻找一个在Scala.Thanks
I am looking for a solution in Scala.Thanks
使用monotonically_increasing_id输出
Output using monotonically_increasing_id
scala> val data = Seq((3,0),(3,1),(3,0),(4,1),(4,0),(4,0)).toDF("id", "value")
data: org.apache.spark.sql.DataFrame = [id: int, value: int]
scala> val minIdx = dataWithIndex.filter($"value" === 1).groupBy($"id").agg(min($"idx")).toDF("r_id", "min_idx")
minIdx: org.apache.spark.sql.DataFrame = [r_id: int, min_idx: bigint]
scala> dataWithIndex.join(minIdx,($"r_id" === $"id") && ($"idx" <= $"min_idx")).select($"id", $"value").show
+---+-----+
| id|value|
+---+-----+
| 3| 0|
| 3| 1|
| 4| 1|
+---+-----+
如果我们没有在原来的数据帧排序的转型解决方案不会工作。彼时在 monotonically_increasing_id()根据原来的DF产生宁可分类DF.I之前已经错过了这个要求。
The solution wont work if we did a sorted transformation in the original dataframe. That time the monotonically_increasing_id() is generated based on original DF rather that sorted DF.I have missed that requirement before.
所有建议都欢迎。
推荐答案
一种方法是使用 monotonically_increasing_id()
和自联接:
One way is to use monotonically_increasing_id()
and a self-join:
val data = Seq((3,0),(3,1),(3,0),(4,1),(4,0),(4,0)).toDF("id", "value")
data.show
+---+-----+
| id|value|
+---+-----+
| 3| 0|
| 3| 1|
| 3| 0|
| 4| 1|
| 4| 0|
| 4| 0|
+---+-----+
现在我们生成一个名为列 IDX
与越来越龙
:
Now we generate a column named idx
with an increasing Long
:
val dataWithIndex = data.withColumn("idx", monotonically_increasing_id())
// dataWithIndex.cache()
现在我们得到了分(IDX)
每个 ID
,其中值= 1
:
Now we get the min(idx)
for each id
where value = 1
:
val minIdx = dataWithIndex
.filter($"value" === 1)
.groupBy($"id")
.agg(min($"idx"))
.toDF("r_id", "min_idx")
现在我们加入分(IDX)
回原来的数据帧
:
Now we join the min(idx)
back to the original DataFrame
:
dataWithIndex.join(
minIdx,
($"r_id" === $"id") && ($"idx" <= $"min_idx")
).select($"id", $"value").show
+---+-----+
| id|value|
+---+-----+
| 3| 0|
| 3| 1|
| 4| 1|
+---+-----+
注意: monotonically_increasing_id()
生成基于行的分区它的价值。这个值可能每个 dataWithIndex
重新评估时间而改变。在我的code,因为懒的评价上面,只有当我把最后的显示
的 monotonically_increasing_id()
评估。
Note: monotonically_increasing_id()
generates its value based on the partition of the row. This value may change each time dataWithIndex
is re-evaluated. In my code above, because of lazy evaluation, it's only when I call the final show
that monotonically_increasing_id()
is evaluated.
如果要强制值保持不变,例如,所以你可以使用显示
来评价上述步骤一步的,取消本线以上:
If you want to force the value to stay the same, for example so you can use show
to evaluate the above step-by-step, uncomment this line above:
// dataWithIndex.cache()
这篇关于根据数据框火花斯卡拉列值过滤行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!