如何解决Pyspark中的泡菜错误? [英] How to resolve pickle error in pyspark?

查看:428
本文介绍了如何解决Pyspark中的泡菜错误?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在遍历文件以收集有关字典中其列和行中的值的信息.我有以下在本地工作的代码:

I am iterating through files to gather information about the values in their columns and rows in a dictionary. I have the following code which works locally:

def search_nulls(file_name):
    separator = ','
    nulls_dict = {}
    fp = open(file_name,'r')
    null_cols = {}
    lines = fp.readlines()

    for n,line in enumerate(lines):
        line = line.split(separator)
        for m,data in enumerate(line):
            data = data.strip('\n').strip('\r')
            if str(m) not in null_cols:
                null_cols[str(m)] = defaultdict(lambda: 0)
            if len(data) <= 4:
                null_cols[str(m)][str(data)] = null_cols[str(m)][str(data)] + 1

    return null_cols


files_to_process = ['tempfile.csv']
results = map(lambda file: search_nulls(file), files_to_process)

上面的代码可以正常工作而不会产生火花. 我评论了以上最后两行,并尝试使用spark,因为这是需要分布式运行的东西的原型:

The above code works fine without spark. I comment the last two lines above, and I try with spark, since this is a prototype of something that will need to run distributed:

os.environ['SPARK_HOME'] = <path_to_spark_folder>
conf = SparkConf().setAppName("search_files").setMaster('local')

sc = SparkContext(conf=conf)

objects = sc.parallelize(files_to_process)
resulting_object = \
    objects.map(lambda file_object: find_nulls(file_object))

result = resulting_object.collect()

但是,使用spark时,会导致以下错误:

When using spark, though, this results in the following error:

File "<path-to-spark>/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "<path-to-spark>/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "<path-to-spark>/python/lib/pyspark.zip/pyspark/serializers.py", line 267, in dump_stream
    bytes = self.serializer.dumps(vs)
  File "<path-to-spark>/python/lib/pyspark.zip/pyspark/serializers.py", line 415, in dumps
    return pickle.dumps(obj, protocol)
TypeError: expected string or Unicode object, NoneType found​

我一直未能找到任何失败的明显原因,因为它可以在本地完美运行,而且我没有在工作节点之间共享任何文件.实际上,无论如何,我只在本地计算机上运行它.

I've been unable to find any obvious reason why this would fail, since it runs perfectly locally, and I am not sharing any files across worker nodes. In fact, I'm only running this on my local machine anyway.

有人知道为什么这可能会失败吗?

Does anyone know of a good reason why this might be failing?

推荐答案

问题的根源是以下行:

null_cols[str(m)] = defaultdict(lambda: 0)

您可以在中阅读泡菜模块文档的部分可以酸洗和不酸洗? :

As you can read in the What can be pickled and unpickled? section of the pickle module documentation:

可以腌制以下类型:

The following types can be pickled:

  • ...
  • 在模块顶层定义的功能(使用def,而不是lambda)
  • 在模块顶层定义的内置函数
  • ...
  • ...
  • functions defined at the top level of a module (using def, not lambda)
  • built-in functions defined at the top level of a module
  • ...

很明显,lambda: 0不满足上述条件.要使其正常工作,您可以例如将lambda表达式替换为int:

It should be clear that lambda: 0 doesn't meet above criteria. To make it work you can for example replace lambda expression with int:

null_cols[str(m)] = defaultdict(int)

我们怎么可能将lambda表达式传递给PySpark中的高阶函数?细节在于魔鬼. PySpark根据上下文使用不同的序列化器.要序列化闭包(包括lambda表达式),请使用自定义 cloudpickle 支持lambda表达式和嵌套函数.为了处理数据,它使用默认的Python工具.

How is it possible that we can pass lambda expression to the higher order functions in PySpark? The devil is in the detail. PySpark is using different serializers depending on a context. To serialize closures, including lambda expressions it is using custom cloudpickle which supports lambda expressions and nested functions. To handle data it is using default Python tools.

一些注意事项:

  • 我不会使用Python file对象读取数据.它不可移植,无法在本地文件系统之外运行.您可以改用SparkContex.wholeTextFiles.
  • 如果确实要确保关闭连接.通常使用with语句是最好的方法
  • 您可以在分割行之前安全地删除换行符
  • I wouldn't use Python file objects to read data. It is not portable and won't work beyond local file system. You can use SparkContex.wholeTextFiles instead.
  • if you do make sure you close the connections. Using with statement is usually the best approach
  • you can safely strip newline characters before you split the line

这篇关于如何解决Pyspark中的泡菜错误?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆