pyspark:等价于Spark 2.3中的arrays_zip [英] pyspark: arrays_zip equivalent in Spark 2.3

查看:425
本文介绍了pyspark:等价于Spark 2.3中的arrays_zip的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何在Spark 2.3中编写arrays_zip的等效功能?

How to write the equivalent function of arrays_zip in Spark 2.3?

Spark 2.4的源代码

Source code from Spark 2.4

def arrays_zip(*cols):
    """
    Collection function: Returns a merged array of structs in which the N-th struct contains all
    N-th values of input arrays.

    :param cols: columns of arrays to be merged.

    >>> from pyspark.sql.functions import arrays_zip
    >>> df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2'])
    >>> df.select(arrays_zip(df.vals1, df.vals2).alias('zipped')).collect()
    [Row(zipped=[Row(vals1=1, vals2=2), Row(vals1=2, vals2=3), Row(vals1=3, vals2=4)])]
    """
    sc = SparkContext._active_spark_context
    return Column(sc._jvm.functions.arrays_zip(_to_seq(sc, cols, _to_java_column)))

如何在PySpark中实现相似?

How to achieve similar in PySpark?

推荐答案

您可以使用UDF获得与arrays_zip相同的功能.请注意,列类型必须相同才能起作用(在IntegerType的情况下).如果列类型有任何差异,请在使用UDF之前将列转换为通用类型.

You can use an UDF to obtain the same functionality as arrays_zip. Note that the column types need to be the same for this to work (in this case of IntegerType). If there are any differences in column types, convert the columns to a common type before using the UDF.

from pyspark.sql import functions as F
from pyspark.sql import types as T

def zip_func(*args):
    return list(zip(*args))

zip_udf = F.udf(zip_func, T.ArrayType(T.ArrayType(T.IntegerType())))

它可以与arrays_zip相同的方式使用,例如:

It can be used in the same way as arrays_zip, for example:

df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2'])
df.select(zip_udf(df.vals1, df.vals2).alias('zipped')).collect()

这篇关于pyspark:等价于Spark 2.3中的arrays_zip的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆