ArrayColumn Pyspark上的计数器功能 [英] Counter function on a ArrayColumn Pyspark

查看:105
本文介绍了ArrayColumn Pyspark上的计数器功能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

从此数据帧开始

+-----+-----------------+
|store|     values      |
+-----+-----------------+
|    1|[1, 2, 3,4, 5, 6]|
|    2|            [2,3]|
+-----+-----------------+

我想应用Counter函数来获取此信息:

I would like to apply the Counter function to get this:

+-----+------------------------------+
|store|     values                   |
+-----+------------------------------+
|    1|{1:1, 2:1, 3:1, 4:1, 5:1, 6:1}|
|    2|{2:1, 3:1}                    |
+-----+------------------------------+

我用另一个问题的答案得到了这个数据框:

I got this data frame using the answer of another question :

GroupBy和concat数组列pyspark

因此,我尝试修改答案中的代码,如下所示:

So I try to modify the code that is in the answers like this:

选项1:

def flatten_counter(val):
    return Counter(reduce (lambda x, y:x+y, val))

udf_flatten_counter = sf.udf(flatten_counter,     ty.ArrayType(ty.IntegerType()))
df3 = df2.select("store", flatten_counter("values2").alias("values3"))
df3.show(truncate=False)

选项2:

df.rdd.map(lambda r: (r.store, r.values)).reduceByKey(lambda x, y: x + y).map(lambda row: Counter(row[1])).toDF(['store', 'values']).show()

但它不起作用.

有人知道我该怎么做吗?

Does anybody know how can I do it?

谢谢

推荐答案

您只需提供正确的数据类型

You just have to provide correct data type

udf_flatten_counter = sf.udf(
    lambda x: dict(Counter(x)),
    ty.MapType(ty.IntegerType(), ty.IntegerType()))

df = spark.createDataFrame(
   [(1, [1, 2, 3, 4, 5, 6]), (2, [2, 3])], ("store", "values"))


df.withColumn("cnt", udf_flatten_counter("values")).show(2, False)
# +-----+------------------+---------------------------------------------------+
# |store|values            |cnt                                                |
# +-----+------------------+---------------------------------------------------+
# |1    |[1, 2, 3, 4, 5, 6]|Map(5 -> 1, 1 -> 1, 6 -> 1, 2 -> 1, 3 -> 1, 4 -> 1)|
# |2    |[2, 3]            |Map(2 -> 1, 3 -> 1)                                |
# +-----+------------------+---------------------------------------------------+

与RDD类似

df.rdd.mapValues(Counter).mapValues(dict).toDF(["store", "values"]).show(2, False)
# +-----+---------------------------------------------------+
# |store|values                                             |
# +-----+---------------------------------------------------+
# |1    |Map(5 -> 1, 1 -> 1, 6 -> 1, 2 -> 1, 3 -> 1, 4 -> 1)|
# |2    |Map(2 -> 1, 3 -> 1)                                |
# +-----+---------------------------------------------------+

转换为dict是必要的,因为显然Pyrolite无法处理Counter对象.

Conversion to dict is necessary because apparently Pyrolite cannot handle Counter objects.

这篇关于ArrayColumn Pyspark上的计数器功能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆