DF.topandas() 在 pyspark 中抛出错误 [英] DF.topandas() throwing error in pyspark

查看:49
本文介绍了DF.topandas() 在 pyspark 中抛出错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 PyCharm 和 PySpark 运行一个巨大的文本文件.

这就是我想要做的:

spark_home = os.environ.get('SPARK_HOME', None)os.environ["SPARK_HOME"] = "C:\spark-2.3.0-bin-hadoop2.7"导入pyspark从 pyspark 导入 SparkContext,SparkConf从 pyspark.sql 导入 SparkSessionconf = SparkConf()sc = SparkContext(conf=conf)spark = SparkSession.builder.config(conf=conf).getOrCreate()将熊猫导入为 pdip = spark.read.format("csv").option("inferSchema","true").option("header","true").load(r"some other file.csv")kw = pd.read_csv(r"some file.csv",encoding='ISO-8859-1',index_col=False,error_bad_lines=False)对于我在范围内(len(kw)):rx = '(?i)'+kw.Keywords[i]ip = ip.where(~ip['Content'].rlike(rx))op = ip.toPandas()op.to_csv(r'something.csv',encoding='utf-8')

但是,PyCharm 向我抛出了这个错误:

要调整日志级别使用 sc.setLogLevel(newLevel).对于 SparkR,使用 setLogLevel(newLevel).2018-06-08 11:31:52 WARN Utils:66 - 截断了计划的字符串表示,因为它太大了.可以通过在 SparkEnv.conf 中设置spark.debug.maxToStringFields"来调整此行为.回溯(最近一次调用最后一次):文件C:/Users/mainak.paul/PycharmProjects/Concept_Building_SIP/ThemeSparkUncoveredGames.py",第 17 行,在 <module> 中.op = ip.toPandas()文件C:\Python27\lib\site-packages\pyspark\sql\dataframe.py",第 1966 行,在 toPandas 中pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)文件C:\Python27\lib\site-packages\pyspark\sql\dataframe.py",第 466 行,收集端口 = self._jdf.collectToPython()文件C:\Python27\lib\site-packages\py4j\java_gateway.py",第 1160 行,在 __call__ 中答案,self.gateway_client,self.target_id,self.name)文件C:\Python27\lib\site-packages\pyspark\sql\utils.py",第 63 行,在 deco返回 f(*a, **kw)文件C:\Python27\lib\site-packages\py4j\protocol.py",第 320 行,在 get_return_value 中格式(target_id,.",名称),值)py4j.protocol.Py4JJavaError:调用 o30.collectToPython 时发生错误.: java.lang.IllegalArgumentException

我只是不明白为什么 .toPandas() 不起作用.Spark 版本是 2.3.这个版本有什么我不知道的变化吗?我在另一台带有 spark 2.2 的机器上运行此代码,并且运行良好.

我什至把导出行改成了这样

op = ip.where(ip['Content'].rlike(rx)).toPandas()

仍然出现相同的错误.我究竟做错了什么?是否有其他方法可以在不影响性能的情况下将 pyspark.sql.dataframe.DataFrame 导出到 .csv?

已编辑我也尝试使用:

ip.write.csv('file.csv')

现在我收到以下错误:

回溯(最近一次调用最后一次): 中的文件somefile.csv",第 21 行ip.write.csv('somefile.csv')文件C:\Python27\lib\site-packages\pyspark\sql\readwriter.py",第 883 行,在 csv 中self._jwrite.csv(路径)文件C:\Python27\lib\site-packages\py4j\java_gateway.py",第 1160 行,在 __call__ 中答案,self.gateway_client,self.target_id,self.name)文件C:\Python27\lib\site-packages\pyspark\sql\utils.py",第 63 行,在 deco返回 f(*a, **kw)文件C:\Python27\lib\site-packages\py4j\protocol.py",第 320 行,在 get_return_value 中格式(target_id,.",名称),值)py4j.protocol.Py4JJavaError:调用 o102.csv 时发生错误.

添加堆栈跟踪:

使用 Spark 的默认 log4j 配置文件:org/apache/spark/log4j-defaults.properties将默认日志级别设置为警告".要调整日志记录级别,请使用 sc.setLogLevel(newLevel).对于 SparkR,使用 setLogLevel(newLevel).18/06/11 16:53:14 错误外壳:无法在 hadoop 二进制路径中找到 winutils 二进制文件java.io.IOException: 无法在 Hadoop 二进制文件中找到可执行文件 C:\spark-2.3.0-bin-hadoop2.7\bin\bin\winutils.exe.在 org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:379)在 org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:394)在 org.apache.hadoop.util.Shell.<clinit>(Shell.java:387)在 org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:80)在 org.apache.hadoop.security.SecurityUtil.getAuthenticationMethod(SecurityUtil.java:611)在 org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:273)在 org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:261)在 org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:791)在 org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:761)在 org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:634)在 org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2430)在 org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2430)在 scala.Option.getOrElse(Option.scala:121)在 org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2430)在 org.apache.spark.SparkContext.(SparkContext.scala:295)在 org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)在 java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)在 java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)在 java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)在 java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:488)在 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)在 py4j.Gateway.invoke(Gateway.java:236)在 py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)在 py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)在 py4j.GatewayConnection.run(GatewayConnection.java:214)在 java.base/java.lang.Thread.run(Thread.java:844)警告:发生了非法的反射访问操作警告:org.apache.hadoop.security.authentication.util.KerberosUtil (file:/C:/opt/spark/spark-2.2.0-bin-hadoop2.7/jars/hadoop-auth-2.7.3.jar) 到方法 sun.security.krb5.Config.getInstance()警告:请考虑将此报告给 org.apache.hadoop.security.authentication.util.KerberosUtil 的维护者警告:使用 --illegal-access=warn 启用进一步非法反射访问操作的警告警告:在未来的版本中将拒绝所有非法访问操作11 年 6 月 18 日 16:53:14 警告 NativeCodeLoader:无法为您的平台加载本机 Hadoop 库...在适用的情况下使用内置 Java 类回溯(最近一次调用最后一次):文件C:/Users/mainak.paul/PycharmProjects/Concept_Building_SIP/ThemeSparkUncoveredGames.py",第 22 行,在 <module> 中.op = ip.toPandas().collect()文件C:\Python27\lib\site-packages\pyspark\sql\dataframe.py",第 1937 行,在 toPandas 中if self.sql_ctx.getConf("spark.sql.execution.pandas.respectSessionTimeZone").lower() \文件C:\Python27\lib\site-packages\pyspark\sql\context.py",第 142 行,在 getConf返回 self.sparkSession.conf.get(key, defaultValue)文件C:\Python27\lib\site-packages\pyspark\sql\conf.py",第 46 行,在 get返回 self._jconf.get(key)文件C:\Python27\lib\site-packages\py4j\java_gateway.py",第 1160 行,在 __call__ 中答案,self.gateway_client,self.target_id,self.name)文件C:\Python27\lib\site-packages\pyspark\sql\utils.py",第 63 行,在 deco返回 f(*a, **kw)文件C:\Python27\lib\site-packages\py4j\protocol.py",第 320 行,在 get_return_value 中格式(target_id,.",名称),值)py4j.protocol.Py4JJavaError:调用 o86.get 时发生错误.:java.util.NoSuchElementException:spark.sql.execution.pandas.respectSessionTimeZone在 org.apache.spark.sql.internal.SQLConf$$anonfun$getConfString$2.apply(SQLConf.scala:1089)在 org.apache.spark.sql.internal.SQLConf$$anonfun$getConfString$2.apply(SQLConf.scala:1089)在 scala.Option.getOrElse(Option.scala:121)在 org.apache.spark.sql.internal.SQLConf.getConfString(SQLConf.scala:1089)在 org.apache.spark.sql.RuntimeConfig.get(RuntimeConfig.scala:74)在 java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)在 java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)在 java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)在 java.base/java.lang.reflect.Method.invoke(Method.java:564)在 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)在 py4j.Gateway.invoke(Gateway.java:280)在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)在 py4j.commands.CallCommand.execute(CallCommand.java:79)在 py4j.GatewayConnection.run(GatewayConnection.java:214)在 java.base/java.lang.Thread.run(Thread.java:844)进程以退出代码 1 结束

解决方案

您需要按如下方式更改您的代码:

spark_home = os.environ.get('SPARK_HOME', None)os.environ["SPARK_HOME"] = "C:\spark-2.3.0-bin-hadoop2.7"导入pyspark从 pyspark 导入 SparkContext,SparkConf从 pyspark.sql 导入 SparkSessionconf = SparkConf()sc = SparkContext(conf=conf)spark = SparkSession.builder.config(conf=conf).getOrCreate()将熊猫导入为 pdip = spark.read.format("csv").option("inferSchema","true").option("header","true").load(r"some other file.csv")kw = pd.read_csv(r"some file.csv",encoding='ISO-8859-1',index_col=False,error_bad_lines=False)对于我在范围内(len(kw)):rx = '(?i)'+kw.Keywords[i]ip = ip.where(~ip['Content'].rlike(rx))op = ip.toPandas().collect()op.to_csv(r'something.csv',encoding='utf-8')

toPandas() 需要跟在 PySpark 中的 collect() 操作之后才能实现 DataFrame.然而,对于大型数据集不应该这样做,因为 toPandas().collect() 会导致数据移动到驱动程序,如果数据集太大而无法放入驱动程序内存,这可能会崩溃.

至于这一行:ip.write.csv('file.csv') 我相信它应该改为 ip.write.csv('file:///home/your-user-name/file.csv') 将文件保存在本地 linux 文件系统上,

ip.option("header", "true").csv("file:///C:/out.csv") 将文件保存在本地 windows 文件系统上(如果你在 Windows 上运行 Spark 和 Hadoop)

ip.write.csv('hdfs:///user/your-user/file.csv') 将文件保存到 HDFS

请告诉我这个解决方案是否适合您.

更新

.只需在 Windows 搜索中输入编辑系统环境变量"

I am running a huge text file using PyCharm and PySpark.

This is what I am trying to do:

spark_home = os.environ.get('SPARK_HOME', None)
os.environ["SPARK_HOME"] = "C:\spark-2.3.0-bin-hadoop2.7"
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = SparkConf()
sc = SparkContext(conf=conf)
spark = SparkSession.builder.config(conf=conf).getOrCreate() 
import pandas as pd
ip = spark.read.format("csv").option("inferSchema","true").option("header","true").load(r"some other file.csv")
kw = pd.read_csv(r"some file.csv",encoding='ISO-8859-1',index_col=False,error_bad_lines=False)
for i in range(len(kw)):
    rx = '(?i)'+kw.Keywords[i]
    ip = ip.where(~ip['Content'].rlike(rx))
op = ip.toPandas()
op.to_csv(r'something.csv',encoding='utf-8')

However, PyCharm is throwing me this error:

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2018-06-08 11:31:52 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
Traceback (most recent call last):
  File "C:/Users/mainak.paul/PycharmProjects/Concept_Building_SIP/ThemeSparkUncoveredGames.py", line 17, in <module>
    op = ip.toPandas()
  File "C:\Python27\lib\site-packages\pyspark\sql\dataframe.py", line 1966, in toPandas
    pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "C:\Python27\lib\site-packages\pyspark\sql\dataframe.py", line 466, in collect
    port = self._jdf.collectToPython()
  File "C:\Python27\lib\site-packages\py4j\java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\Python27\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
    return f(*a, **kw)
  File "C:\Python27\lib\site-packages\py4j\protocol.py", line 320, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o30.collectToPython.
: java.lang.IllegalArgumentException

I am just not getting why .toPandas() is not working. Spark version is 2.3. Has something changed in this version that I am unaware of? I ran this code in a different machine with spark 2.2, and it ran fine.

I even changed the export line to something like this

op = ip.where(ip['Content'].rlike(rx)).toPandas()

Still getting the same error. What am I doing wrong? Is there some other way of exporting pyspark.sql.dataframe.DataFrame to a .csv without compromising on performance?

EDITED I also tried using:

ip.write.csv('file.csv')

Now i am getting the following error:

Traceback (most recent call last):
  File "somefile.csv", line 21, in <module>
    ip.write.csv('somefile.csv')
  File "C:\Python27\lib\site-packages\pyspark\sql\readwriter.py", line 883, in csv
    self._jwrite.csv(path)
  File "C:\Python27\lib\site-packages\py4j\java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\Python27\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
    return f(*a, **kw)
  File "C:\Python27\lib\site-packages\py4j\protocol.py", line 320, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o102.csv.

Adding the stacktrace:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/06/11 16:53:14 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable C:\spark-2.3.0-bin-hadoop2.7\bin\bin\winutils.exe in the Hadoop binaries.
    at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:379)
    at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:394)
    at org.apache.hadoop.util.Shell.<clinit>(Shell.java:387)
    at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:80)
    at org.apache.hadoop.security.SecurityUtil.getAuthenticationMethod(SecurityUtil.java:611)
    at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:273)
    at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:261)
    at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:791)
    at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:761)
    at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:634)
    at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2430)
    at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2430)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2430)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:295)
    at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:488)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:236)
    at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
    at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.base/java.lang.Thread.run(Thread.java:844)
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/C:/opt/spark/spark-2.2.0-bin-hadoop2.7/jars/hadoop-auth-2.7.3.jar) to method sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
18/06/11 16:53:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Traceback (most recent call last):
  File "C:/Users/mainak.paul/PycharmProjects/Concept_Building_SIP/ThemeSparkUncoveredGames.py", line 22, in <module>
    op = ip.toPandas().collect()
  File "C:\Python27\lib\site-packages\pyspark\sql\dataframe.py", line 1937, in toPandas
    if self.sql_ctx.getConf("spark.sql.execution.pandas.respectSessionTimeZone").lower() \
  File "C:\Python27\lib\site-packages\pyspark\sql\context.py", line 142, in getConf
    return self.sparkSession.conf.get(key, defaultValue)
  File "C:\Python27\lib\site-packages\pyspark\sql\conf.py", line 46, in get
    return self._jconf.get(key)
  File "C:\Python27\lib\site-packages\py4j\java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\Python27\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
    return f(*a, **kw)
  File "C:\Python27\lib\site-packages\py4j\protocol.py", line 320, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o86.get.
: java.util.NoSuchElementException: spark.sql.execution.pandas.respectSessionTimeZone
    at org.apache.spark.sql.internal.SQLConf$$anonfun$getConfString$2.apply(SQLConf.scala:1089)
    at org.apache.spark.sql.internal.SQLConf$$anonfun$getConfString$2.apply(SQLConf.scala:1089)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.internal.SQLConf.getConfString(SQLConf.scala:1089)
    at org.apache.spark.sql.RuntimeConfig.get(RuntimeConfig.scala:74)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.base/java.lang.Thread.run(Thread.java:844)


Process finished with exit code 1

解决方案

You need to change your code as follows:

spark_home = os.environ.get('SPARK_HOME', None)
os.environ["SPARK_HOME"] = "C:\spark-2.3.0-bin-hadoop2.7"
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = SparkConf()
sc = SparkContext(conf=conf)
spark = SparkSession.builder.config(conf=conf).getOrCreate() 
import pandas as pd
ip = spark.read.format("csv").option("inferSchema","true").option("header","true").load(r"some other file.csv")
kw = pd.read_csv(r"some file.csv",encoding='ISO-8859-1',index_col=False,error_bad_lines=False)
for i in range(len(kw)):
    rx = '(?i)'+kw.Keywords[i]
    ip = ip.where(~ip['Content'].rlike(rx))
op = ip.toPandas().collect()
op.to_csv(r'something.csv',encoding='utf-8')

toPandas() needs to be followed by a collect() action in PySpark for the DataFrame to materialize. This however should not be done for large datasets, as toPandas().collect() causes the data to move to driver, which might crash in case the dataset is to big to fit into driver memory.

As for this line : ip.write.csv('file.csv') I belive it should be changed to ip.write.csv('file:///home/your-user-name/file.csv') to save the file on the local linux filesystem,

ip.option("header", "true").csv("file:///C:/out.csv") to save the file on the local windows filesystem (if you are running Spark and Hadoop on Windows)

or ip.write.csv('hdfs:///user/your-user/file.csv') to save the file to HDFS

Do tell me if this solution worked out for you.

UPDATE

https://github.com/steveloughran/winutils/tree/master/hadoop-2.7.1/binfollow this link and download the winutils.exe file. Create a folder named hadoop on your C drive and another folder called bin inside the hadoop folder. Place the winutils.exe you downloaded earlier into this directory. Then you need to edit the system variables and add the variable HADOOP_HOME to the list. Once this is done you wont get the error for winutils/hadoop from spark.

. Just type "Edit the system environment variables" in your windows search

这篇关于DF.topandas() 在 pyspark 中抛出错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆