PySpark DataFrames - 枚举,无须转换为大 pandas 的方式? [英] PySpark DataFrames - way to enumerate without converting to Pandas?
问题描述
我有一个非常大的 pyspark.sql.dataframe.DataFrame 命名DF。
我需要列举以下纪录,因此,能够具有一定的索引来访问记录的一些方法。 (或指标选择组记录范围)
在熊猫,我可以做只是
指数= [2,3,6,7]
DF [索引]
在这里,我想类似的东西,的(无需转换数据帧到大熊猫)的
我能到最接近的是:
-
通过枚举原始数据框的所有对象:
=指标np.arange(df.count())
df_indexed = df.withColumn(索引,索引)- 搜索值我需要用在哪里()函数。
问题:
- 为什么它不工作,以及如何使工作?如何将行添加到一个数据帧?
-
它以后的工作做类似:
指数= [2,3,6,7]
df1.where(指数中的指数)。收集() -
任何速度更快,处理它更简单的方式?
它不起作用,因为:
- 为
withColumn
第二个参数应该是一个列
不是一个集合。np.array
不会在这里工作 - 当你通过
指数索引
作为SQL前pression到,其中
指标
超出范围,它没有解决一个有效的标识符
PySpark> = 1.4.0
您可以使用相应的窗函数和查询使用 Column.isin
方法或正确格式化查询字符串添加行号:
从pyspark.sql.functions导入关口,ROWNUMBER
从pyspark.sql.window导入窗口W = Window.orderBy()
指数= df.withColumn(指数,ROWNUMBER()。在(W))#使用DSL
indexed.where(COL(指数)。ISIN(集(索引)))#使用SQL前pression
indexed.where(指数({0})格式(,连接(在索引STR(X)为X)))
看起来不叫 PARTITION窗口功能BY
子句的所有数据移动到单个分区,这样上面可能不是最好的解决方案,毕竟。
任何速度更快,处理它更简单的方式?
块引用>不是真的。星火DataFrames不支持随机行访问。
PairedRDD
可以使用查找
方法,它是比较快的,如果数据被分割使用访问HashPartitioner
。还有址变-RDD 支持高效的查询项目。修改
的PySpark独立版本的你可以尝试这样的事:
从pyspark.sql进口排
从pyspark.sql.types导入StructType,StructField,LongType行=行(字符)
row_with_index =行(字符,指数)DF = sc.parallelize(行(CHR(X))为在范围(97的x,112))。toDF()
df.show(5)##字符
## 一个
## b
## C
##ð
##Ë#这部分是不是测试,但应该工作,后来节省一些工作
模式= StructType(
df.schema.fields [:] + [StructField(指数,LongType(),FALSE))指数=(df.rdd#提取RDD
.zipWithIndex()#添加索引
.MAP(拉姆达RI:row_with_index(*列表(RI [0])+ [RI [1]))#地图行
.toDF(模式))#这将工作,没有模式,但也将更加昂贵indexed.where(COL(指数)。插页(指数))I have a very big pyspark.sql.dataframe.DataFrame named df. I need some way of enumerating records- thus, being able to access record with certain index. (or select group of records with indexes range)
In pandas, I could make just
indexes=[2,3,6,7] df[indexes]
Here I want something similar, (and without converting dataframe to pandas)
The closest I can get to is:
Enumerating all the objects in the original dataframe by:
indexes=np.arange(df.count()) df_indexed=df.withColumn('index', indexes)
- Searching for values I need using where() function.
QUESTIONS:
- Why it doesn't work and how to make it working? How to add a row to a dataframe?
Would it work later to make something like:
indexes=[2,3,6,7] df1.where("index in indexes").collect()
Any faster and simpler way to deal with it?
解决方案It doesn't work because:
- the second argument for
withColumn
should be aColumn
not a collection.np.array
won't work here- when you pass
"index in indexes"
as a SQL expression towhere
indexes
is out of scope and it is not resolved as a valid identifierPySpark >= 1.4.0
You can add row numbers using respective window function and query usingColumn.isin
method or properly formated query string:from pyspark.sql.functions import col, rowNumber from pyspark.sql.window import Window w = Window.orderBy() indexed = df.withColumn("index", rowNumber().over(w)) # Using DSL indexed.where(col("index").isin(set(indexes))) # Using SQL expression indexed.where("index in ({0})".format(",".join(str(x) for x in indexes)))
It looks like window functions called without
PARTITION BY
clause move all data to the single partition so above may be not the best solution after all.Any faster and simpler way to deal with it?
Not really. Spark DataFrames don't support random row access.
PairedRDD
can be accessed usinglookup
method which is relatively fast if data is partitioned usingHashPartitioner
. There is also indexed-rdd project which supports efficient lookups.Edit:
Independent of PySpark version you can try something like this:
from pyspark.sql import Row from pyspark.sql.types import StructType, StructField, LongType row = Row("char") row_with_index = Row("char", "index") df = sc.parallelize(row(chr(x)) for x in range(97, 112)).toDF() df.show(5) ## char ## a ## b ## c ## d ## e # This part is not tested but should work and save some work later schema = StructType( df.schema.fields[:] + [StructField("index", LongType(), False)]) indexed = (df.rdd # Extract rdd .zipWithIndex() # Add index .map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # Map to rows .toDF(schema)) # It will work without schema but will be more expensive indexed.where(col("index").inSet(indexes))
这篇关于PySpark DataFrames - 枚举,无须转换为大 pandas 的方式?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!