PySpark DataFrames - 枚举而不转换为 Pandas 的方法? [英] PySpark DataFrames - way to enumerate without converting to Pandas?

查看:28
本文介绍了PySpark DataFrames - 枚举而不转换为 Pandas 的方法?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个名为 df 的非常大的 pyspark.sql.dataframe.DataFrame.我需要某种枚举记录的方法 - 因此,能够访问具有特定索引的记录.(或选择具有索引范围的记录组)

I have a very big pyspark.sql.dataframe.DataFrame named df. I need some way of enumerating records- thus, being able to access record with certain index. (or select group of records with indexes range)

在熊猫中,我只能制作

indexes=[2,3,6,7] 
df[indexes]

这里我想要类似的东西,(并且不将数据帧转换为熊猫)

Here I want something similar, (and without converting dataframe to pandas)

我能得到的最近的是:

  • 通过以下方式枚举原始数据帧中的所有对象:

  • Enumerating all the objects in the original dataframe by:

indexes=np.arange(df.count())
df_indexed=df.withColumn('index', indexes)

  • 使用 where() 函数搜索我需要的值.
  • 问题:

    1. 为什么它不起作用以及如何使它起作用?如何向数据框中添加一行?
    2. 以后是否可以制作类似的东西:

    1. Why it doesn't work and how to make it working? How to add a row to a dataframe?
    2. Would it work later to make something like:

     indexes=[2,3,6,7] 
     df1.where("index in indexes").collect()
    

  • 有什么更快更简单的方法来处理它?<​​/p>

  • 推荐答案

    它不起作用,因为:

    1. withColumn 的第二个参数应该是 Column 而不是集合.np.array 在这里不起作用
    2. 当您将 "index in index" 作为 SQL 表达式传递给 where indexes 超出范围并且未解析为有效标识符
    1. the second argument for withColumn should be a Column not a collection. np.array won't work here
    2. when you pass "index in indexes" as a SQL expression to where indexes is out of scope and it is not resolved as a valid identifier

    PySpark >= 1.4.0

    您可以使用各自的窗口函数添加行号,并使用 Column.isin 方法或格式正确的查询字符串进行查询:

    You can add row numbers using respective window function and query using Column.isin method or properly formated query string:

    from pyspark.sql.functions import col, rowNumber
    from pyspark.sql.window import Window
    
    w = Window.orderBy()
    indexed = df.withColumn("index", rowNumber().over(w))
    
    # Using DSL
    indexed.where(col("index").isin(set(indexes)))
    
    # Using SQL expression
    indexed.where("index in ({0})".format(",".join(str(x) for x in indexes)))
    

    看起来没有 PARTITION BY 子句调用的窗口函数将所有数据移动到单个分区,所以上面可能不是最好的解决方案.

    It looks like window functions called without PARTITION BY clause move all data to the single partition so above may be not the best solution after all.

    有什么更快更简单的方法来处理它?<​​/p>

    Any faster and simpler way to deal with it?

    不是真的.Spark DataFrames 不支持随机行访问.

    Not really. Spark DataFrames don't support random row access.

    PairedRDD 可以使用lookup 方法访问,如果使用HashPartitioner 对数据进行分区,该方法相对较快.还有 indexed-rdd 项目支持高效查找.

    PairedRDD can be accessed using lookup method which is relatively fast if data is partitioned using HashPartitioner. There is also indexed-rdd project which supports efficient lookups.

    编辑:

    独立于 PySpark 版本,您可以尝试这样的操作:

    Independent of PySpark version you can try something like this:

    from pyspark.sql import Row
    from pyspark.sql.types import StructType, StructField, LongType
    
    row = Row("char")
    row_with_index = Row("char", "index")
    
    df = sc.parallelize(row(chr(x)) for x in range(97, 112)).toDF()
    df.show(5)
    
    ## +----+
    ## |char|
    ## +----+
    ## |   a|
    ## |   b|
    ## |   c|
    ## |   d|
    ## |   e|
    ## +----+
    ## only showing top 5 rows
    
    # This part is not tested but should work and save some work later
    schema  = StructType(
        df.schema.fields[:] + [StructField("index", LongType(), False)])
    
    indexed = (df.rdd # Extract rdd
        .zipWithIndex() # Add index
        .map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # Map to rows
        .toDF(schema)) # It will work without schema but will be more expensive
    
    # inSet in Spark < 1.3
    indexed.where(col("index").isin(indexes))
    

    这篇关于PySpark DataFrames - 枚举而不转换为 Pandas 的方法?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆