Pyspark数据帧:访问列(TypeError:列不可迭代) [英] Pyspark Data Frame: Access to a Column (TypeError: Column is not iterable)

查看:126
本文介绍了Pyspark数据帧:访问列(TypeError:列不可迭代)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在为PySpark代码苦苦挣扎,特别是,我想对不可迭代的对象 col 调用一个函数.

I am struggling with a PySpark code, in particular, I'd like to call a function on an object col which is not iterable.

from pyspark.sql.functions import col, lower, regexp_replace, split
from googletrans import Translator

def clean_text(c):
  c = lower(c)
  c = regexp_replace(c, r"^rt ", "")
  c = regexp_replace(c, r"(https?\://)\S+", "")
  c = regexp_replace(c, "[^a-zA-Z0-9\\s]", "") #removePunctuation 
  c = regexp_replace(c, r"\n", " ")
  c = regexp_replace(c, r"   ", " ")
  c = regexp_replace(c, r"  ", " ")  
#   c = translator.translate(c, dest='en', src='auto')
  return c

clean_text_df = uncleanedText.select(clean_text(col("unCleanedCol")).alias("sentence"))
clean_text_df.printSchema()
clean_text_df.show(10)

一旦我在 c = translator.translate(c,dest ='en',src ='auto')中运行代码,Spark显示的错误就是 TypeError:Column不可迭代.

As soon as I run the code within c = translator.translate(c, dest='en', src='auto') the error shown from Spark is TypeError: Column is not iterable.

我想做的是逐字翻译:

发件人:

+--------------------+
|            sentence|
+--------------------+
|ciao team there a...|
|dear itteam i urg...|
|buongiorno segnal...|
|hi team regarding...|
|hello please add ...|
|ciao vorrei effet...|
|buongiorno ho vis...|
+--------------------+

收件人:

+--------------------+
|            sentence|
+--------------------+
|hello team there ...|
|dear itteam i urg...|
|goodmorning segna...|
|hi team regarding...|
|hello please add ...|
|hello would effet...|
|goodmorning I see...|
+--------------------+

DataFrame 的架构为:

root
 |-- sentence: string (nullable = true)

有人可以帮我吗?

非常感谢您

推荐答案

PySpark只是为支持Apache Spark而编写的Python API.如果要使用自定义python函数,则必须定义用户定义的函数(

PySpark is just the Python API written to support Apache Spark. If you want to use custom python functions, you will have to define a user defined function (udf).

保持您的 clean_text()函数原样(注释掉 translate 行)并尝试以下操作:

Keep your clean_text() function as is (with the translate line commented out) and try the following:

from pyspark.sql.functions import udf
from pyspark.sql.Types import StringType

def translate(c):
  return translator.translate(c, dest='en', src='auto')

translateUDF = udf(translate, StringType())

clean_text_df = uncleanedText.select(
  translateUDF(clean_text(col("unCleanedCol"))).alias("sentence")
)

原始 clean_text 中的其他功能(

The other functions in your original clean_text (lower and regexp_replace) are built-in spark functions and operate on apyspark.sql.Column.

请注意,使用此 udf 会对性能产生影响.参见:火花功能与UDF性能?

Be aware that using this udf will bring a performance hit. See: Spark functions vs UDF performance?

这篇关于Pyspark数据帧:访问列(TypeError:列不可迭代)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆