PySpark:PicklingError:无法序列化对象:TypeError:无法腌制CompiledFFI对象 [英] PySpark: PicklingError: Could not serialize object: TypeError: can't pickle CompiledFFI objects
问题描述
我是PySpark环境的新手,在尝试使用加密模块在RDD中加密数据时遇到错误.这是代码:
I'm new to the PySpark environment and came across an error while trying to encrypt data in an RDD with the cryptography module. Here's the code:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('encrypt').getOrCreate()
df = spark.read.csv('test.csv', inferSchema = True, header = True)
df.show()
df.printSchema()
from cryptography.fernet import Fernet
key = Fernet.generate_key()
f = Fernet(key)
dfRDD = df.rdd
print(dfRDD)
mappedRDD = dfRDD.map(lambda value: (value[0], str(f.encrypt(str.encode(value[1]))), value[2] * 100))
data = mappedRDD.toDF()
data.show()
在我尝试将value[1]
与str(f.encrypt(str.encode(value[1])))
映射之前,一切正常.我收到以下错误:
Everything works fine of course until I try mapping the value[1]
with str(f.encrypt(str.encode(value[1])))
. I receive the following error:
PicklingError:无法序列化对象:TypeError:无法使CompiledFFI对象腌制
PicklingError: Could not serialize object: TypeError: can't pickle CompiledFFI objects
我没有看到太多有关此错误的资源,而是想看看是否有人遇到过该错误(或者如果通过PySpark,您有推荐的列加密方法).
I have not seen too many resources referring to this error and wanted to see if anyone else has encountered it (or if via PySpark you have a recommended approach to column encryption).
推荐答案
推荐的列加密方法
recommended approach to column encryption
您可以考虑使用Hive内置加密功能( HIVE-5207 , HIVE-6329 ),但目前此功能相当有限( HIVE-7934 ).
You may consider Hive built-in encryption (HIVE-5207, HIVE-6329) but it is fairly limited at this moment (HIVE-7934).
您的当前代码不起作用,因为Fernet
对象不可序列化.您可以通过仅分发密钥来使其工作:
Your current code doesn't work because Fernet
objects are not serializable. You can make it work by distributing only keys:
def f(value, key=key):
return value[0], str(Fernet(key).encrypt(str.encode(value[1]))), value[2] * 100
mappedRDD = dfRDD.map(f)
或
def g(values, key=key):
f = Fernet(key)
for value in values:
yield value[0], str(f.encrypt(str.encode(value[1]))), value[2] * 100
mappedRDD = dfRDD.mapPartitions(g)
这篇关于PySpark:PicklingError:无法序列化对象:TypeError:无法腌制CompiledFFI对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!