如何将行 ID 的持久列添加到 Spark DataFrame? [英] How do I add an persistent column of row ids to Spark DataFrame?

查看:23
本文介绍了如何将行 ID 的持久列添加到 Spark DataFrame?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这个问题并不新鲜,但我在 Spark 中发现了令人惊讶的行为.我需要向 DataFrame 添加一列行 ID.我使用了 DataFrame 方法 monotonically_increasing_id() 并且它确实给了我一个额外的唯一行 ID 列(顺便说一下,它们不是连续的,但是是唯一的).

This question is not new, however I am finding surprising behavior in Spark. I need to add a column of row IDs to a DataFrame. I used the DataFrame method monotonically_increasing_id() and It does give me an additional col of uniques row IDs (that are NOT consecutive by the way, but are unique).

我遇到的问题是,当我过滤 DataFrame 时,结果 DataFrame 中的行 ID 被重新分配.两个数据帧如下所示.

The problem I'm having is that when I filter the DataFrame the row Ids in the resulting DataFrame are re-assigned. The two DataFrames are shown below.

  • 第一个是初始 DataFrame,添加了如下行 ID:

  • the first one is the initial DataFrame with row IDs added as follows:

df.withColumn("rowId", monotonically_increasing_id()) 

  • 第二个DataFrame是通过df.filter(col("P"))对col P进行过滤后得到的.

  • the second DataFrame is the one obtained after filtering on the col P via df.filter(col("P")).

    问题由 custId 169 的 rowId 说明,它在初始 DataFrame 中为 5,但在过滤后,当 custId 169 被过滤掉时,rowId (5) 被重新分配给 custmId 773!我不知道为什么这是默认行为.

    The problem is illustrated by the rowId for custId 169, which was 5 in the initial DataFrame, but after filtering that rowId (5) was re-assigned to custmId 773 when custId 169 was filtered out! I don't know why this is the default behavior.

    我希望 rowIds 是粘性的";如果我从 DataFrame 中删除行,我不希望它们的 ID重新使用",我希望它们与它们的行一起消失.有可能这样做吗?我没有看到从 monotonically_increasing_id 方法请求这种行为的任何标志.

    I would want the rowIds to be "sticky"; if I remove rows from the DataFrame I do not want their IDs "re-used", I want them gone too along with their rows. Is it possible to do that? I don't see any flags to request this behavior from monotonically_increasing_id method.

    +---------+--------------------+-------+
    | custId  |    features|    P  |rowId|
    +---------+--------------------+-------+
    |806      |[50,5074,...|   true|    0|
    |832      |[45,120,1...|   true|    1|
    |216      |[6691,272...|   true|    2|
    |926      |[120,1788...|   true|    3|
    |875      |[54,120,1...|   true|    4|
    |169      |[19406,21...|  false|    5|
    
    after filtering on P:
    +---------+--------------------+-------+
    |   custId|    features|    P  |rowId|
    +---------+--------------------+-------+
    |      806|[50,5074,...|   true|    0|
    |      832|[45,120,1...|   true|    1|
    |      216|[6691,272...|   true|    2|
    |      926|[120,1788...|   true|    3|
    |      875|[54,120,1...|   true|    4|
    |      773|[3136,317...|   true|    5|
    

    推荐答案

    Spark 2.0

    Spark 2.1 中的另一个类似问题已通过 SPARK-14393

    Another similar issue has been resolved in Spark 2.1 with SPARK-14393

    Spark 1.x

    您遇到的问题相当微妙,但可以简化为一个简单的事实 monotonically_increasing_id 是一个非常丑陋的函数.它显然不是纯粹的,它的价值取决于你完全无法控制的东西.

    Problem you experience is rather subtle but can be reduced to a simple fact monotonically_increasing_id is an extremely ugly function. It is clearly not pure and its value depends on something that is completely out your control.

    它不接受任何参数,因此从优化器的角度来看,它何时被调用并不重要,并且可以在所有其他操作之后推送.因此,您看到的行为.

    It doesn't take any parameters so from an optimizer perspective it doesn't matter when it is called and can be pushed after all other operations. Hence the behavior you see.

    如果您查看代码,您会发现这是通过使用 Nondeterministic 扩展 MonotonicallyIncreasingID 表达式来明确标记的.

    If you take look at the code you'll find out this is explicitly marked by extending MonotonicallyIncreasingID expression with Nondeterministic.

    我认为没有任何优雅的解决方案,但您可以处理此问题的一种方法是添加对过滤值的人为依赖.例如像这样的 UDF:

    I don't think there is any elegant solution but one way you can handle this is to add an artificial dependency on the filtered value. For example with an UDF like this:

    from pyspark.sql.types import LongType
    from pyspark.sql.functions import udf
    
    bound = udf(lambda _, v: v, LongType()) 
    
    (df
      .withColumn("rn", monotonically_increasing_id())
      # Due to nondeterministic behavior it has to be a separate step
      .withColumn("rn", bound("P", "rn"))  
      .where("P"))
    

    通常,在 RDD 上使用 zipWithIndex 添加索引,然后将其转换回 DataFrame 会更简洁.

    In general it could be cleaner to add indices using zipWithIndex on a RDD and then convert it back to a DataFrame.

    * 在 Spark 2.x 中,上面显示的解决方法不再是有效的解决方案(也不是必需的),其中 Python UDF 是执行计划优化的主题.

    * Workaround shown above is no longer a valid solution (nor required) in Spark 2.x where Python UDFs are subject of the execution plan optimizations.

    这篇关于如何将行 ID 的持久列添加到 Spark DataFrame?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆