PySpark-如何使用一列中的行值访问与该列值同名的另一列 [英] PySpark- How to use a row value from one column to access another column which has the same name as of the row value

查看:210
本文介绍了PySpark-如何使用一列中的行值访问与该列值同名的另一列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个PySpark df:

I have a PySpark df:

+---+---+---+---+---+---+---+---+
| id| a1| b1| c1| d1| e1| f1|ref|
+---+---+---+---+---+---+---+---+
|  0|  1| 23|  4|  8|  9|  5| b1|
|  1|  2| 43|  8| 10| 20| 43| e1|
|  2|  3| 15|  0|  1| 23|  7| b1|
|  3|  4|  2|  6| 11|  5|  8| d1|
|  4|  5|  6|  7|  2|  8|  1| f1|
+---+---+---+---+---+---+---+---+

我最终希望创建另一个列"out",其值基于"ref"列.例如,在第一行中,ref列具有b1作为值.我想在输出"列中看到"b1"列的值,即23. 这是预期的输出:

I eventually want to create another column "out" whose values are based on "ref" column. For example, in the first row ref column has b1 as value. In "out" column i would like to see column "b1" value i.e., 23. Here is the expected output:

+---+---+---+---+---+---+---+---+---+
| id| a1| b1| c1| d1| e1| f1|ref|out|
+---+---+---+---+---+---+---+---+---+
|  0|  1| 23|  4|  8|  9|  5| b1| 23|
|  1|  2| 43|  8| 10| 20| 43| e1| 20|
|  2|  3| 15|  0|  1| 23|  7| b1| 15|
|  3|  4|  2|  6| 11|  5|  8| d1| 11|
|  4|  5|  6|  7|  2|  8|  1| f1|  1|
+---+---+---+---+---+---+---+---+---+

请提供有关如何实现输出"列的建议.我正在使用Spark 1.6版本.谢谢

Please advise on how to achieve the "out" column. I'm using Spark 1.6 version.Thanks

推荐答案

可以独立于版本转换为RDDmap,然后转换回DataFrame:

Independent of version you can convert to RDD, map, and convert back to DataFrame:

df = spark.createDataFrame(
    [(0, 1, 23, 4, 8, 9, 5, "b1"), (1, 2, 43, 8, 10, 20, 43, "e1")], 
    ("id", "a1", "b1", "c1", "d1", "e1", "f1", "ref")
)

df.rdd.map(lambda row: row + (row[row.ref], )).toDF(df.columns + ["out"])

+---+---+---+---+---+---+---+---+---+
| id| a1| b1| c1| d1| e1| f1|ref|out|
+---+---+---+---+---+---+---+---+---+
|  0|  1| 23|  4|  8|  9|  5| b1| 23|
|  1|  2| 43|  8| 10| 20| 43| e1| 20|
+---+---+---+---+---+---+---+---+---+

您还可以保留架构

from pyspark.sql.types import LongType, StructField

spark.createDataFrame(
    df.rdd.map(lambda row: row + (row[row.ref], )), 
    df.schema.add(StructField("out", LongType())))

使用DataFrames可以组成复杂的Columns.在1.6版中:

With DataFrames you can compose complex Columns. In 1.6:

from pyspark.sql.functions import array, col, udf
from pyspark.sql.types import  LongType, MapType, StringType

data_cols = [x for x in df.columns if x not in {"id", "ref"}]

# Literal map from column name to index
name_to_index = udf(
    lambda: {x: i for i, x in enumerate(data_cols)},
    MapType(StringType(), LongType())
)()

# Array of data
data_array = array(*[col(c) for c in data_cols])
df.withColumn("out", data_array[name_to_index[col("ref")]])

+---+---+---+---+---+---+---+---+---+
| id| a1| b1| c1| d1| e1| f1|ref|out|
+---+---+---+---+---+---+---+---+---+
|  0|  1| 23|  4|  8|  9|  5| b1| 23|
|  1|  2| 43|  8| 10| 20| 43| e1| 20|
+---+---+---+---+---+---+---+---+---+

在2.x版本中,您可以跳过中间对象:

In 2.x you can skip intermediate objects:

from pyspark.sql.functions import create_map, lit, col
from itertools import chain

# Map from column name to column value
name_to_value = create_map(*chain.from_iterable(
    (lit(c), col(c)) for c in data_cols
))

df.withColumn("out", name_to_value[col("ref")])

+---+---+---+---+---+---+---+---+---+
| id| a1| b1| c1| d1| e1| f1|ref|out|
+---+---+---+---+---+---+---+---+---+
|  0|  1| 23|  4|  8|  9|  5| b1| 23|
|  1|  2| 43|  8| 10| 20| 43| e1| 20|
+---+---+---+---+---+---+---+---+---+

最后您可以使用when:

from pyspark.sql.functions import col, lit, when
from functools import reduce

out = reduce(
    lambda acc, x: when(col("ref") == x, col(x)).otherwise(acc), 
    data_cols,
    lit(None)
)

+---+---+---+---+---+---+---+---+---+
| id| a1| b1| c1| d1| e1| f1|ref|out|
+---+---+---+---+---+---+---+---+---+
|  0|  1| 23|  4|  8|  9|  5| b1| 23|
|  1|  2| 43|  8| 10| 20| 43| e1| 20|
+---+---+---+---+---+---+---+---+---+

这篇关于PySpark-如何使用一列中的行值访问与该列值同名的另一列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆