创建另一列以检查 pyspark 中的不同值 [英] create another columns for checking different value in pyspark

查看:40
本文介绍了创建另一列以检查 pyspark 中的不同值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望得到低于预期的输出:

I wish to have below expected output:

我的代码:

import numpy as np
pd_dataframe = pd.DataFrame({'id': [i for i in range(10)],
                             'values': [10,5,3,-1,0,-10,-4,10,0,10]})

sp_dataframe = spark.createDataFrame(pd_dataframe)
sign_acc_row = F.udf(lambda x: int(np.sign(x)), IntegerType())
sp_dataframe = sp_dataframe.withColumn('sign', sign_acc_row('values'))
sp_dataframe.show()

我想创建另一列,当值与前一行不同时,它会返回额外的 1.

I wanted to create another column with which it returns an additional of 1 when the value is different from previous row.

预期输出:

    id  values  sign    numbering
0   0   10  1   1
1   1   5   1   1
2   2   3   1   1
3   3   -1  -1  2
4   4   0   0   3
5   5   -10 -1  4
6   6   -4  -1  4
7   7   10  1   5
8   8   0   0   6
9   9   10  1   7

推荐答案

您可以通过以下方式使用自定义函数:

Here's a way you can do using a custom function:

import pyspark.sql.functions as F

# compare the next value with previous
def f(x):
    c = 1
    l = [c]
    last_value = [x[0]]
    for i in x[1:]:
        if i == last_value[-1]:
            l.append(c)
        else:
            c += 1
            l.append(c)
        last_value.append(i)
    return l

# take sign column as a list
sign_list = sp_dataframe.select('sign').rdd.map(lambda x: x.sign).collect()

# create a new dataframe using the output
sp = spark.createDataFrame(pd.DataFrame(f(sign_list), columns=['numbering']))

在 pyspark 中将列表作为列附加到数据帧有点棘手.为此,我们需要创建一个虚拟的 row_idx 来加入数据帧.

Append a list as a column to a dataframe is a bit tricky in pyspark. For this we'll need to create a dummy row_idx to join the dataframes.

# create dummy indexes
sp_dataframe = sp_dataframe.withColumn("row_idx", F.monotonically_increasing_id())
sp = sp.withColumn("row_idx", F.monotonically_increasing_id())

# join the dataframes
final_df = (sp_dataframe
            .join(sp, sp_dataframe.row_idx == sp.row_idx)
            .orderBy('id')
            .drop("row_idx"))

final_df.show()

+---+------+----+---------+
| id|values|sign|numbering|
+---+------+----+---------+
|  0|    10|   1|        1|
|  1|     5|   1|        1|
|  2|     3|   1|        1|
|  3|    -1|  -1|        2|
|  4|     0|   0|        3|
|  5|   -10|  -1|        4|
|  6|    -4|  -1|        4|
|  7|    10|   1|        5|
|  8|     0|   0|        6|
|  9|    10|   1|        7|
+---+------+----+---------+

这篇关于创建另一列以检查 pyspark 中的不同值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆