Pyspark:PicklingError:无法序列化对象: [英] Pyspark: PicklingError: Could not serialize object:

查看:1199
本文介绍了Pyspark:PicklingError:无法序列化对象:的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下两个数据帧:df_whitelist和df_text

I have the following two data frames: df_whitelist and df_text

+-------+--------------------+
|keyword|    whitelist_terms |
+-------+--------------------+
|     LA|             LA city|
|     LA|        US LA in da |
| client|this client has i...|
| client|our client has do...|
+-------+--------------------+
+--------------------+----------+
|                Text|  Keywords|
+--------------------+----------+
|the client as ada...|client;ada|
|this client has l...| client;LA|
+--------------------+----------+

在df_whitelist中,每个关键字对应一组术语,例如关键字LA对应于"LA city"和"US LA in da". 在df_text中,我有文本和在此文本中找到的一些关键字. 我想做的是针对每段文字,例如客户有ada ..",针对其每个关键字"client"和"ada",检查该关键字的所有白名单字词,以了解如何该术语多次出现在文本中. 我尝试过的内容如下:

In df_whitelist, each keyword corresponds to a set of terms, e.g. keyword LA corresponds to "LA city" and "US LA in da". In df_text, I have text and some keywords found in this text. What I want to do is that for each piece of text, such as "the client has ada..", for each of its keywords "client" and "ada", check through all the whitelist terms for that keyword, to see how many times the term occurred in the text. what I have tried is like following:

import pyspark.sql.functions as F
import pyspark.sql.types as T
import re
def whitelisting(text,listOfKeyword,df_whitelist):
    keywords = listOfKeyword.split(";")
    found_whiteterms_count = 0
    for k in keywords:
        if df_whitelist.filter(df_whitelist.keyword == k).count() == 0:
            found_whiteterms_count = found_whiteterms_count + 0
        else:
            df = df_whitelist.filter(df_whitelist.keyword == k).select("whitelist_terms")
            n = df.rdd.map(lambda x:len(re.findall(x["whitelist_terms"],text))).reduce(lambda x, y: x+y)
            found_whiteterms_count = found_whiteterms_count + n    
    return found_whiteterms_count     
whitelisting_udf = F.udf(lambda text,listOfKeyword: whitelisting(text,listOfKeyword,df_whitelist),T.IntegerType())
text.withColumn("whitelist_counts", whitelisting_udf(text.Text,text.Keywords))

我得到了错误:

PicklingError: Could not serialize object: Py4JError: An error occurred while calling o1153.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:272)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.base/java.lang.Thread.run(Thread.java:844)

尝试了一段时间后,我不知道.任何人都可以帮助指出问题以及如何解决它.谢谢.

I can not figure out after trying for some time. Could anybody kindly help to point out the problem and how to fix it. Thanks.

推荐答案

您正在将pyspark数据帧df_whitelist传递给UDF,不能对pyspark数据帧进行腌制.您还在UDF内的数据帧上进行计算,这是不可接受的(不可能).请记住,函数的调用次数将与数据帧中的行数一样多,因此应使计算简单.并且仅在pyspark sql函数无法完成的情况下执行此操作.

You are passing a pyspark dataframe, df_whitelist to a UDF, pyspark dataframes cannot be pickled. You are also doing computations on a dataframe inside a UDF which is not acceptable (not possible). Keep in mind that your function is going to be called as many times as the number of rows in your dataframe, so you should keep computations simple. and do it only if it couldn't be done with pyspark sql functions.

您需要做的是在keyword上连接两个数据框. 让我们从您提供的两个示例数据帧开始:

What you need to do instead, is to join the two dataframes on keyword. Let's start with the two sample dataframes you provided:

df_whitelist = spark.createDataFrame(
    [["LA", "LA city"], ["LA", "US LA in da"], ["client", "this client has i"], ["client", "our client"]], 
    ["keyword", "whitelist_terms"])
df_text = spark.createDataFrame(
    [["the client as ada", "client;ada"], ["this client has l", "client;LA"]], 
    ["Text", "Keywords"])

df_text中的Keywords列需要进行一些处理,我们必须将字符串转换为数组,然后将其分解,以便每行只有一个项目:

Column Keywords in df_text needs some processing, we have to transform the string into an array and then explode it so that we only have one item per line:

import pyspark.sql.functions as F
df_text = df_text.select("Text", F.explode(F.split("Keywords", ";")).alias("keyword"))

    +-----------------+-------+
    |             Text|keyword|
    +-----------------+-------+
    |the client as ada| client|
    |the client as ada|    ada|
    |this client has l| client|
    |this client has l|     LA|
    +-----------------+-------+

现在我们可以在keyword上合并两个数据框:

Now we can join the two data frames on keyword:

df = df_text.join(df_whitelist, "keyword", "leftouter")

    +-------+-----------------+-----------------+
    |keyword|             Text|  whitelist_terms|
    +-------+-----------------+-----------------+
    |     LA|this client has l|          LA city|
    |     LA|this client has l|      US LA in da|
    |    ada|the client as ada|             null|
    | client|the client as ada|this client has i|
    | client|the client as ada|       our client|
    | client|this client has l|this client has i|
    | client|this client has l|       our client|
    +-------+-----------------+-----------------+

  • UDF中调用的第一个条件可以转换为:如果df_text中的keyword不存在于df_whitelist中,则为0.这等效于说出 left join中的列将为NULL,因为它们仅出现在左侧的数据框中

    • The first condition you invoke in your UDF can be translated as follows: if keyword in df_text is not present in df_whitelist then 0. It is equivalent to saying the value for df_whitelist columns are going to be NULL in the left join since they only appear in the left data frame

      第二个条件:计算whitelist_termsText中出现的次数:Text.count(whitelist_terms)

      The second condition: you count the number of times whitelist_terms appear in Text: Text.count(whitelist_terms)

      我们将写一个UDF来做到这一点:

      We'll write a UDF to do this:

      from pyspark.sql.types import IntegerType
      count_terms = F.udf(lambda Text, term: Text.count(term) if term is not None else 0, IntegerType())
      df = df.select(
          "Text", 
          "keyword", 
          F.when(F.isnull("whitelist_terms"), 0).otherwise(count_terms("Text", "whitelist_terms")).alias("whitelist_counts"))
      
          +-----------------+-------+----------------+
          |             Text|keyword|whitelist_counts|
          +-----------------+-------+----------------+
          |this client has l|     LA|               0|
          |this client has l|     LA|               0|
          |the client as ada|    ada|               0|
          |the client as ada| client|               0|
          |the client as ada| client|               0|
          |this client has l| client|               0|
          |this client has l| client|               0|
          +-----------------+-------+----------------+
      

      最后,我们可以汇总以仅使用不同的Text来返回数据框:

      Finally we can aggregate to get back to a dataframe with only distinct Text:

      res = df.groupBy("Text").agg(
          F.collect_set("keyword").alias("Keywords"),
          F.sum("whitelist_counts").alias("whitelist_counts"))
      res.show()
      
          +-----------------+-------------+----------------+
          |             Text|     Keywords|whitelist_counts|
          +-----------------+-------------+----------------+
          |this client has l| [client, LA]|               0|
          |the client as ada|[ada, client]|               0|
          +-----------------+-------------+----------------+
      

      这篇关于Pyspark:PicklingError:无法序列化对象:的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆