pyspark 生成特定列的行哈希并将其添加为新列 [英] pyspark generate row hash of specific columns and add it as a new column

查看:61
本文介绍了pyspark 生成特定列的行哈希并将其添加为新列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 spark 2.2.0 和 pyspark2.

I am working with spark 2.2.0 and pyspark2.

我创建了一个 DataFrame df,现在尝试添加一个新列 "rowhash",它是 DataFrame 中特定列的 sha2 哈希值.

I have created a DataFrame df and now trying to add a new column "rowhash" that is the sha2 hash of specific columns in the DataFrame.

例如,假设 df 有列:(column1, column2, ..., column10)

我需要在新列 "rowhash" 中使用 sha2((column2||column3||column4||...... column8), 256).

I require sha2((column2||column3||column4||...... column8), 256) in a new column "rowhash".

目前,我尝试使用以下方法:

For now, I tried using below methods:

1) 使用了 hash() 函数,但由于它给出了一个整数输出,所以用处不大

1) Used hash() function but since it gives an integer output it is of not much use

2) 尝试使用 sha2() 函数,但失败了.

2) Tried using sha2() function but it is failing.

columnarray 有我需要的列数组.

Say columnarray has array of columns I need.

def concat(columnarray):
    concat_str = ''
    for val in columnarray:
        concat_str = concat_str + '||' + str(val) 
    concat_str = concat_str[2:] 
    return concat_str 

然后

df1 = df1.withColumn("row_sha2", sha2(concat(columnarray),256))

由于无法解决"错误而失败.

This is failing with "cannot resolve" error.

感谢 gaw 的回答.由于我只需要散列特定的列,我创建了这些列名的列表(在 hash_col 中)并将您的函数更改为:

Thanks gaw for your answer. Since I have to hash only specific columns, I created a list of those column names (in hash_col) and changed your function as :

 def sha_concat(row, columnarray):
   row_dict = row.asDict()      #transform row to a dict
   concat_str = '' 
   for v in columnarray: 
       concat_str = concat_str + '||' + str(row_dict.get(v)) 
   concat_str = concat_str[2:] 
   #preserve concatenated value for testing (this can be removed later)
   row_dict["sha_values"] = concat_str  
   row_dict["sha_hash"] = hashlib.sha256(concat_str).hexdigest()
   return Row(**row_dict) 

然后作为:

    df1.rdd.map(lambda row: sha_concat(row,hash_col)).toDF().show(truncate=False)

但是现在失败并出现错误:

It is now however failing with error:

    UnicodeEncodeError: 'ascii' codec can't encode character u'\ufffd' in position 8: ordinal not in range(128)

我可以在其中一列中看到 \ufffd 的值,所以我不确定是否有办法处理这个问题?

I can see value of \ufffd in one of the column so I am unsure if there is a way to handle this ?

推荐答案

您可以使用 pyspark.sql.functions.concat_ws() 连接您的列和 pyspark.sql.functions.sha2() 获取 SHA256 哈希值.

You can use pyspark.sql.functions.concat_ws() to concatenate your columns and pyspark.sql.functions.sha2() to get the SHA256 hash.

使用来自@gaw 的数据:

Using the data from @gaw:

from pyspark.sql.functions import sha2, concat_ws
df = spark.createDataFrame(
    [(1,"2",5,1),(3,"4",7,8)],
    ("col1","col2","col3","col4")
)
df.withColumn("row_sha2", sha2(concat_ws("||", *df.columns), 256)).show(truncate=False)
#+----+----+----+----+----------------------------------------------------------------+
#|col1|col2|col3|col4|row_sha2                                                        |
#+----+----+----+----+----------------------------------------------------------------+
#|1   |2   |5   |1   |1b0ae4beb8ce031cf585e9bb79df7d32c3b93c8c73c27d8f2c2ddc2de9c8edcd|
#|3   |4   |7   |8   |57f057bdc4178b69b1b6ab9d78eabee47133790cba8cf503ac1658fa7a496db1|
#+----+----+----+----+----------------------------------------------------------------+

根据文档,您可以将 0256 作为第二个参数传递给 sha2():

You can pass in either 0 or 256 as the second argument to sha2(), as per the docs:

返回 SHA-2 系列哈希函数(SHA-224、SHA-256、SHA-384 和 SHA-512)的十六进制字符串结果.numBits 表示结果所需的位长,其值必须为 224、256、384、512 或 0(相当于 256).

Returns the hex string result of SHA-2 family of hash functions (SHA-224, SHA-256, SHA-384, and SHA-512). The numBits indicates the desired bit length of the result, which must have a value of 224, 256, 384, 512, or 0 (which is equivalent to 256).

函数 concat_ws 接受一个分隔符和一个要连接的列列表.我传入 || 作为分隔符和 df.columns 作为列列表.

The function concat_ws takes in a separator, and a list of columns to join. I am passing in || as the separator and df.columns as the list of columns.

我在这里使用了所有列,但是您可以指定您喜欢的任何列子集 - 在您的情况下,它是 columnarray.(您需要使用 * 来解压列表.)

I am using all of the columns here, but you can specify whatever subset of columns you'd like- in your case that would be columnarray. (You need to use the * to unpack the list.)

这篇关于pyspark 生成特定列的行哈希并将其添加为新列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆