PySpark DataFrames - 枚举,无须转换为大 pandas 的方式? [英] PySpark DataFrames - way to enumerate without converting to Pandas?

查看:1227
本文介绍了PySpark DataFrames - 枚举,无须转换为大 pandas 的方式?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个非常大的 pyspark.sql.dataframe.DataFrame 命名DF。
我需要列举以下纪录,因此,能够具有一定的索引来访问记录的一些方法。 (或指标选择组记录范围)

在熊猫,我可以做只是

 指数= [2​​,3,6,7]
DF [索引]

在这里,我想类似的东西,的(无需转换数据帧到大熊猫)

我能到最接近的是:


  • 通过枚举原始数据框的所有对象:

      =指标np.arange(df.count())
    df_indexed = df.withColumn(索引,索引)


    • 搜索值我需要用在哪里()函数。


问题:


  1. 为什么它不工作,以及如何使工作?如何将行添加到一个数据帧?

  2. 它以后的工作做类似:

     指数= [2​​,3,6,7]
     df1.where(指数中的指数)。收集()


  3. 任何速度更快,处理它更简单的方式?



解决方案

它不起作用,因为:


  1. withColumn 第二个参数应该是一个不是一个集合。 np.array 不会在这里工作

  2. 当你通过指数索引作为SQL前pression到,其中 指标超出范围,它没有解决一个有效的标识符

PySpark> = 1.4.0

您可以使用相应的窗函数和查询使用 Col​​umn.isin 方法或正确格式化查询字符串添加行号:

从pyspark.sql.functions导入关口,ROWNUMBER
从pyspark.sql.window导入窗口W = Window.orderBy()
指数= df.withColumn(指数,ROWNUMBER()。在(W))#使用DSL
indexed.where(COL(指数)。ISIN(集(索引)))#使用SQL前pression
indexed.where(指数({0})格式(,连接(在索引STR(X)为X)))

看起来不叫 PARTITION窗口功能BY 子句的所有数据移动到单个分区,这样上面可能不是最好的解决方案,毕竟。


  

任何速度更快,处理它更简单的方式?


不是真的。星火DataFrames不支持随机行访问。

PairedRDD 可以使用查找方法,它是比较快的,如果数据被分割使用访问 HashPartitioner 。还有址变-RDD 支持高效的查询项目。

修改

的PySpark独立版本的你可以尝试这样的事:

 从pyspark.sql进口排
从pyspark.sql.types导入StructType,StructField,LongType行=行(字符)
row_with_index =行(字符,指数)DF = sc.parallelize(行(CHR(X))为在范围(97的x,112))。toDF()
df.show(5)##字符
## 一个
## b
## C
##ð
##Ë#这部分是不是测试,但应该工作,后来节省一些工作
模式= StructType(
    df.schema.fields [:] + [StructField(指数,LongType(),FALSE))指数=(df.rdd#提取RDD
    .zipWithIndex()#添加索引
    .MAP(拉姆达RI:row_with_index(*列表(RI [0])+ [RI [1]))#地图行
    .toDF(模式))#这将工作,没有模式,但也将更加昂贵indexed.where(COL(指数)。插页(指数))

I have a very big pyspark.sql.dataframe.DataFrame named df. I need some way of enumerating records- thus, being able to access record with certain index. (or select group of records with indexes range)

In pandas, I could make just

indexes=[2,3,6,7] 
df[indexes]

Here I want something similar, (and without converting dataframe to pandas)

The closest I can get to is:

  • Enumerating all the objects in the original dataframe by:

    indexes=np.arange(df.count())
    df_indexed=df.withColumn('index', indexes)
    

    • Searching for values I need using where() function.

QUESTIONS:

  1. Why it doesn't work and how to make it working? How to add a row to a dataframe?
  2. Would it work later to make something like:

     indexes=[2,3,6,7] 
     df1.where("index in indexes").collect()
    

  3. Any faster and simpler way to deal with it?

解决方案

It doesn't work because:

  1. the second argument for withColumn should be a Column not a collection. np.array won't work here
  2. when you pass "index in indexes" as a SQL expression to where indexes is out of scope and it is not resolved as a valid identifier

PySpark >= 1.4.0

You can add row numbers using respective window function and query using Column.isin method or properly formated query string:

from pyspark.sql.functions import col, rowNumber
from pyspark.sql.window import Window

w = Window.orderBy()
indexed = df.withColumn("index", rowNumber().over(w))

# Using DSL
indexed.where(col("index").isin(set(indexes)))

# Using SQL expression
indexed.where("index in ({0})".format(",".join(str(x) for x in indexes)))

It looks like window functions called without PARTITION BY clause move all data to the single partition so above may be not the best solution after all.

Any faster and simpler way to deal with it?

Not really. Spark DataFrames don't support random row access.

PairedRDD can be accessed using lookup method which is relatively fast if data is partitioned using HashPartitioner. There is also indexed-rdd project which supports efficient lookups.

Edit:

Independent of PySpark version you can try something like this:

from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType

row = Row("char")
row_with_index = Row("char", "index")

df = sc.parallelize(row(chr(x)) for x in range(97, 112)).toDF()
df.show(5)

## char
## a   
## b   
## c   
## d   
## e  

# This part is not tested but should work and save some work later
schema  = StructType(
    df.schema.fields[:] + [StructField("index", LongType(), False)])

indexed = (df.rdd # Extract rdd
    .zipWithIndex() # Add index
    .map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # Map to rows
    .toDF(schema)) # It will work without schema but will be more expensive

indexed.where(col("index").inSet(indexes))

这篇关于PySpark DataFrames - 枚举,无须转换为大 pandas 的方式?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆