如何添加行ID的持久列星火数据帧? [英] How do I add an persistent column of row ids to Spark DataFrame?

查看:1138
本文介绍了如何添加行ID的持久列星火数据帧?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这问题不是新的,但是我发现在星火令人惊讶的行为。我需要的行ID列添加到数据帧。我用数据框方法monotonically_increasing_id(),它确实给我的唯一身份的行ID(即不能由连续的方式,但是是唯一)的额外​​关口。

我遇到的问题是,当我筛选数据框行ID在生成的数据帧被重新分配。两个DataFrames如下。


  • 第一种是添加了行ID初始数据框如下:

      df.withColumn(ROWID,monotonically_increasing_id())


  • 第二个数据帧是通过 df.filter在山坳P过滤后得到的一(COL(P))


的问题是由ROWID示出了用于CUSTID 169,其在初始数据帧为5,但过滤该ROWID(5)被重新分配到custmId 773之后,当滤出CUSTID 169!我不知道这是为什么默认的行为。

我想在的rowid 是粘;如果我从数据帧中删除行,我不希望自己的ID的重复使用,我想他们与他们一起行了太多。是否有可能这样做吗?我没有看到任何标志请求从 monotonically_increasing_id这种行为方法。

  + --------- + -------------------- + ------ -  +
| CUSTID |功能| P | ROWID |
+ --------- + -------------------- + ------- +
| 806 | [50,5074,... |真| 0 |
| 832 | [45,120,1 ... |真| 1 |
| 216 | [6691,272 ... |真| 2 |
| 926 | [120,1788 ... |真| 3 |
| 875 | [54,120,1 ... |真| 4 |
| 169 | [19406,21 ... |假| 5 |P上过滤后:
+ --------- + -------------------- + ------- +
| CUSTID |功能| P | ROWID |
+ --------- + -------------------- + ------- +
| 806 | [50,5074,... |真| 0 |
| 832 | [45,120,1 ... |真| 1 |
| 216 | [6691,272 ... |真| 2 |
| 926 | [120,1788 ... |真| 3 |
| 875 | [54,120,1 ... |真| 4 |
| 773 | [3136,317 ... |真| 5 |


解决方案

问题,你的经验是相当微妙的,但可以减少到一个简单的事实 monotonically_increasing_id 是一个极其丑陋的功能。这显然​​是不纯净,它的值取决于事情完全超出你的控制。

它不带任何参数从优化角度看,它并不重要,当它被调用,都可以等操作后推左右。因此,你看到的行为。

如果你把看code你会发现这是明确延长 MonotonicallyIncreasingID 前pression与标记非确定

我不认为有任何优雅的解决方案,但你可以处理这个问题的方法之一是增加对滤波值人工的依赖。例如使用UDF是这样的:

从pyspark.sql.types导入LongType
从pyspark.sql.functions进口UDF势必= UDF(拉姆达_,V:V,LongType())(DF
  .withColumn(RN,monotonically_increasing_id())
  #由于不确定的行为它必须是一个单独的步骤
  .withColumn(RN,结合(P,RN))
  。凡(P))

在总体上可能是清洁工在 RDD 添加使用 zipWithIndex 指数,然后将其转换回数据帧

This question is not new, however I am finding surprising behavior in Spark. I need to add a column of row IDs to a DataFrame. I used the DataFrame method monotonically_increasing_id() and It does give me an additional col of uniques row IDs (that are NOT consecutive by the way, but are unique).

The problem I'm having is that when I filter the DataFrame the row Ids in the resulting DataFrame are re-assigned. The two DataFrames are shown below.

  • the first one is the initial DataFrame with row IDs added as follows:

    df.withColumn("rowId", monotonically_increasing_id()) 
    

  • the second DataFrame is the one obtained after filtering on the col P via df.filter(col("P")).

The problem is illustrated by the rowId for custId 169, which was 5 in the initial DataFrame, but after filtering that rowId (5) was re-assigned to custmId 773 when custId 169 was filtered out! I don't know why this is the default behavior.

I would want the rowIds to be "sticky"; if I remove rows from the DataFrame I do not want their IDs "re-used", I want them gone too along with their rows. Is it possible to do that? I don't see any flags to request this behavior from monotonically_increasing_id method.

+---------+--------------------+-------+
| custId  |    features|    P  |rowId|
+---------+--------------------+-------+
|806      |[50,5074,...|   true|    0|
|832      |[45,120,1...|   true|    1|
|216      |[6691,272...|   true|    2|
|926      |[120,1788...|   true|    3|
|875      |[54,120,1...|   true|    4|
|169      |[19406,21...|  false|    5|

after filtering on P:
+---------+--------------------+-------+
|   custId|    features|    P  |rowId|
+---------+--------------------+-------+
|      806|[50,5074,...|   true|    0|
|      832|[45,120,1...|   true|    1|
|      216|[6691,272...|   true|    2|
|      926|[120,1788...|   true|    3|
|      875|[54,120,1...|   true|    4|
|      773|[3136,317...|   true|    5|

解决方案

Problem you experience is rather subtle but can be reduced to a simple fact monotonically_increasing_id is an extremely ugly function. It is clearly not pure and its value depends on something that is completely out your control.

It doesn't take any parameters so from an optimizer perspective it doesn't matter when it is called and can be pushed after all other operations. Hence the behavior you see.

If you take look at the code you'll find out this is explicitly marked by extending MonotonicallyIncreasingID expression with Nondeterministic.

I don't think there is any elegant solution but one way you can handle this is to add an artificial dependency on the filtered value. For example with an UDF like this:

from pyspark.sql.types import LongType
from pyspark.sql.functions import udf

bound = udf(lambda _, v: v, LongType()) 

(df
  .withColumn("rn", monotonically_increasing_id())
  # Due to nondeterministic behavior it has to be a separate step
  .withColumn("rn", bound("P", "rn"))  
  .where("P"))

In general it could be cleaner to add indices using zipWithIndex on a RDD and then convert it back to a DataFrame.

这篇关于如何添加行ID的持久列星火数据帧?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆