如何使用 pyspark collect_list 函数检索所有列 [英] How to retrieve all columns using pyspark collect_list functions

查看:87
本文介绍了如何使用 pyspark collect_list 函数检索所有列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 pyspark 2.0.1.我正在尝试对我的数据框进行分组 &从我的数据框中检索所有字段的值.我发现

I have a pyspark 2.0.1. I'm trying to groupby my data frame & retrieve the value for all the fields from my data frame. I found that

z=data1.groupby('country').agg(F.collect_list('names')) 

将为我提供国家和地区的价值观名称属性 &对于名称属性,它会将列标题作为 collect_list(names).但是对于我的工作,我有大约 15 列的数据框 &我将运行一个循环 &每次将在循环内更改 groupby 字段 &需要所有剩余字段的输出.能否请您建议我如何使用 collect_list() 或任何其他 pyspark 函数执行此操作?

will give me values for country & names attribute & for names attribute it will give column header as collect_list(names). But for my job I have dataframe with around 15 columns & I will run a loop & will change the groupby field each time inside loop & need the output for all of the remaining fields.Can you please suggest me how to do it using collect_list() or any other pyspark functions?

我也试过这个代码

from pyspark.sql import functions as F 
fieldnames=data1.schema.names 
names1= list() 
for item in names: 
   if item != 'names': 
     names1.append(item) 
 z=data1.groupby('names').agg(F.collect_list(names1)) 
 z.show() 

但收到错误信息

Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.collect_list. Trace: py4j.Py4JException: Method collect_list([class java.util.ArrayList]) does not exist 

推荐答案

在调用 groupBy 之前使用 struct 组合列

Use struct to combine the columns before calling groupBy

假设你有一个数据框

df = spark.createDataFrame(sc.parallelize([(0,1,2),(0,4,5),(1,7,8),(1,8,7)])).toDF("a","b","c")

df = df.select("a", f.struct(["b","c"]).alias("newcol"))
df.show()
+---+------+
|  a|newcol|
+---+------+
|  0| [1,2]|
|  0| [4,5]|
|  1| [7,8]|
|  1| [8,7]|
+---+------+
df = df.groupBy("a").agg(f.collect_list("newcol").alias("collected_col"))
df.show()
+---+--------------+
|  a| collected_col|
+---+--------------+
|  0|[[1,2], [4,5]]|
|  1|[[7,8], [8,7]]|
+---+--------------+

聚合操作只能对单列进行.

Aggregation operation can be done only on single columns.

聚合后,您可以收集结果并对其进行迭代以分离组合列生成索引字典.或者你可以写一个udf 分隔组合列.

After aggregation, You can collect the result and iterate over it to separate the combined columns generate the index dict. or you can write a udf to separate the combined columns.

from pyspark.sql.types import *
def foo(x):
    x1 = [y[0] for y in x]
    x2 = [y[1] for y in x]
    return(x1,x2)

st = StructType([StructField("b", ArrayType(LongType())), StructField("c", ArrayType(LongType()))])
udf_foo = udf(foo, st)
df = df.withColumn("ncol", 
                  udf_foo("collected_col")).select("a",
                  col("ncol").getItem("b").alias("b"), 
                  col("ncol").getItem("c").alias("c"))
df.show()

+---+------+------+
|  a|     b|     c|
+---+------+------+
|  0|[1, 4]|[2, 5]|
|  1|[7, 8]|[8, 7]|
+---+------+------+

这篇关于如何使用 pyspark collect_list 函数检索所有列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆