如何在PySpark中获得独特的字典RDD? [英] How can I get a distinct RDD of dicts in PySpark?

查看:63
本文介绍了如何在PySpark中获得独特的字典RDD?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个词典的RDD,我想获得仅包含不同元素的RDD.但是,当我尝试致电

I have an RDD of dictionaries, and I'd like to get an RDD of just the distinct elements. However, when I try to call

rdd.distinct()

PySpark给我以下错误

PySpark gives me the following error

TypeError: unhashable type: 'dict'

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/02/19 16:55:56 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/rdd.py", line 317, in func
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/rdd.py", line 1776, in combineLocally
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues
    d[k] = comb(d[k], v) if k in d else creator(v)
TypeError: unhashable type: 'dict'

我在字典中确实有一个键,可以将其用作不同的元素,但是文档中没有提供有关如何解决此问题的任何线索.

I do have a key inside of the dict that I could use as the distinct element, but the documentation doesn't give any clues on how to solve this problem.

编辑:内容由字符串,字符串数组和数字字典组成

The content is made up of strings, arrays of strings, and a dictionary of numbers

词典的示例...我想将具有相同"data_fingerprint"键的字典视为相等:

EDIT 2: Example of a dictionary... I'd like dicts with equal "data_fingerprint" keys to be considered equal:

{"id":"4eece341","data_fingerprint":"1707db7bddf011ad884d132bf80baf3c"}

谢谢

推荐答案

正如@ zero323在他的评论中指出的那样,您必须决定如何比较字典,因为它们不可散列.一种方法是例如通过字典顺序对键(因为它们没有以任何特定的顺序)进行排序.然后创建以下形式的字符串:

As @zero323 pointed out in his comment you have to decide how to compare dictionaries as they are not hashable. One way would be to sort the keys (as they are not in any particular order) for example by lexycographic order. Then create a string of the form:

def dict_to_string(dict):
    ...
    return 'key1|value1|key2|value2...|keyn|valuen'

如果您嵌套了不可散列的对象,则必须递归执行此操作.

If you have nested unhashable objects you have to do this recursively.

现在,您只需将RDD转换为以字符串作为键(或它的某种哈希)即可配对

Now you can just transform your RDD to pair with string as a key (or some kind of hash of it)

pairs = dictRDD.map(lambda d: (dict_to_string(d), d))

要获得想要的东西,您只需减少休假键即可

To get what you want you just have to reduce by key as fallows

distinctDicts = pairs.reduceByKey(lambda val1, val2: val1).values()

这篇关于如何在PySpark中获得独特的字典RDD?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆