当我尝试将字符串列转换为数字时,PySpark返回异常 [英] PySpark returns an exception when I try to cast string columns as numeric
问题描述
我试图将字符串列转换为数字,但在PySpark中却遇到异常.我在下面提供了代码和错误消息.
I'm trying to cast string columns to numeric, but I am getting an exception in PySpark. I provide below the code and the error message.
是否可以将csv文件中的特定列作为数字导入?(默认值将作为字符串导入).
Is it possible to import the specific columns from the csv file as numeric? (the default is to be imported as strings).
我还有什么选择?
我的代码和错误消息如下:
My code and the error messages follow below:
import pandas as pd
import seaborn as sns
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
# Loads data. Be careful of indentations and whitespace
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master('local') \
.appName('Data cleaning') \
.getOrCreate()
# These lines enable the run of spark commands
from pyspark.context import SparkContext
#from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
#spark = SparkSession(sc)
import os
os.chdir('D:\\DIGITAL_LIBRARY\\DataCamp')
df = spark.read.format('csv').option('header','true').option('mode','DROPMALFORMED').\
load('D:\DIGITAL_LIBRARY\DataCamp\\df.csv')
from pyspark.sql.functions import *
df.columns
['sku_id',
'promo_start_week',
'hierarchy2_name',
'brand',
'region',
'store_norm_group',
'holiday_names',
'holiday_types',
'list_price_net_q0.7',
'promoted_price_net_q0.7',
'list_price_net_q0.3_relative',
'discount_rate',
'promoted_price_net_q0.9',
'list_price_net_q0.3',
'list_price_net_q0.7_relative',
'promoted_price_net_q0.5_relative',
'promoted_price_net_q0.7_relative',
'promoted_price_net',
'promoted_price_net_q0.1_relative',
'list_price_net_q0.1',
'list_price_net_q0.5_relative',
'promoted_price_net_q0.3_relative',
'promoted_price_net_q0.5',
'list_price_net_q0.5',
'revenue',
'promoted_price_net_q0.3',
'list_price_net_q0.9',
'list_price_net_q0.1_relative',
'promoted_price_net_q0.9_relative',
'First_week_of_promo',
'list_price_net_q0.9_relative',
'promoted_price_net_q0.1']
cols_to_numeric = ['list_price_net_q0.7',
'promoted_price_net_q0.7',
'list_price_net_q0.3_relative',
'discount_rate',
'promoted_price_net_q0.9',
'list_price_net_q0.3',
'list_price_net_q0.7_relative',
'promoted_price_net_q0.5_relative',
'promoted_price_net_q0.7_relative',
'promoted_price_net',
'promoted_price_net_q0.1_relative',
'list_price_net_q0.1',
'list_price_net_q0.5_relative',
'promoted_price_net_q0.3_relative',
'promoted_price_net_q0.5',
'list_price_net_q0.5',
'revenue',
'promoted_price_net_q0.3',
'list_price_net_q0.9',
'list_price_net_q0.1_relative',
'promoted_price_net_q0.9_relative',
'First_week_of_promo',
'list_price_net_q0.9_relative',
'promoted_price_net_q0.1']
df1 = df.select(*(col(c).cast("float").alias(c) for c in cols_to_numeric))
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
D:\Spark\python\pyspark\sql\utils.py in deco(*a, **kw)
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
D:\Spark\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
319 "An error occurred while calling {0}{1}{2}.\n".
--> 320 format(target_id, ".", name), value)
321 else:
Py4JJavaError: An error occurred while calling o36.select.
: org.apache.spark.sql.AnalysisException: cannot resolve '`list_price_net_q0.7`' given input columns: [promoted_price_net_q0.1, promoted_price_net_q0.1_relative, promo_start_week, promoted_price_net_q0.9, discount_rate, promoted_price_net, brand, holiday_names, list_price_net_q0.1, list_price_net_q0.7_relative, revenue, promoted_price_net_q0.7, First_week_of_promo, promoted_price_net_q0.5_relative, promoted_price_net_q0.3_relative, promoted_price_net_q0.5, list_price_net_q0.5, promoted_price_net_q0.9_relative, sku_id, promoted_price_net_q0.3, list_price_net_q0.3, list_price_net_q0.1_relative, hierarchy2_name, store_norm_group, list_price_net_q0.5_relative, list_price_net_q0.9_relative, region, promoted_price_net_q0.7_relative, list_price_net_q0.9, holiday_types, list_price_net_q0.7, list_price_net_q0.3_relative];;
'Project [cast('list_price_net_q0.7 as float) AS list_price_net_q0.7#109, cast('promoted_price_net_q0.7 as float) AS promoted_price_net_q0.7#110, cast('list_price_net_q0.3_relative as float) AS list_price_net_q0.3_relative#111, cast(discount_rate#22 as float) AS discount_rate#112, cast('promoted_price_net_q0.9 as float) AS promoted_price_net_q0.9#113, cast('list_price_net_q0.3 as float) AS list_price_net_q0.3#114, cast('list_price_net_q0.7_relative as float) AS list_price_net_q0.7_relative#115, cast('promoted_price_net_q0.5_relative as float) AS promoted_price_net_q0.5_relative#116, cast('promoted_price_net_q0.7_relative as float) AS promoted_price_net_q0.7_relative#117, cast(promoted_price_net#28 as float) AS promoted_price_net#118, cast('promoted_price_net_q0.1_relative as float) AS promoted_price_net_q0.1_relative#119, cast('list_price_net_q0.1 as float) AS list_price_net_q0.1#120, cast('list_price_net_q0.5_relative as float) AS list_price_net_q0.5_relative#121, cast('promoted_price_net_q0.3_relative as float) AS promoted_price_net_q0.3_relative#122, cast('promoted_price_net_q0.5 as float) AS promoted_price_net_q0.5#123, cast('list_price_net_q0.5 as float) AS list_price_net_q0.5#124, cast(revenue#35 as float) AS revenue#125, cast('promoted_price_net_q0.3 as float) AS promoted_price_net_q0.3#126, cast('list_price_net_q0.9 as float) AS list_price_net_q0.9#127, cast('list_price_net_q0.1_relative as float) AS list_price_net_q0.1_relative#128, cast('promoted_price_net_q0.9_relative as float) AS promoted_price_net_q0.9_relative#129, cast(First_week_of_promo#40 as float) AS First_week_of_promo#130, cast('list_price_net_q0.9_relative as float) AS list_price_net_q0.9_relative#131, cast('promoted_price_net_q0.1 as float) AS promoted_price_net_q0.1#132]
+- AnalysisBarrier
+- Project [sku_id#11, promo_start_week#12, hierarchy2_name#13, brand#14, region#15, store_norm_group#16, holiday_names#17, holiday_types#18, list_price_net_q0.7#19, promoted_price_net_q0.7#20, list_price_net_q0.3_relative#21, discount_rate#22, promoted_price_net_q0.9#23, list_price_net_q0.3#24, list_price_net_q0.7_relative#25, promoted_price_net_q0.5_relative#26, promoted_price_net_q0.7_relative#27, promoted_price_net#28, promoted_price_net_q0.1_relative#29, list_price_net_q0.1#30, list_price_net_q0.5_relative#31, promoted_price_net_q0.3_relative#32, promoted_price_net_q0.5#33, list_price_net_q0.5#34, ... 8 more fields]
+- Relation[_c0#10,sku_id#11,promo_start_week#12,hierarchy2_name#13,brand#14,region#15,store_norm_group#16,holiday_names#17,holiday_types#18,list_price_net_q0.7#19,promoted_price_net_q0.7#20,list_price_net_q0.3_relative#21,discount_rate#22,promoted_price_net_q0.9#23,list_price_net_q0.3#24,list_price_net_q0.7_relative#25,promoted_price_net_q0.5_relative#26,promoted_price_net_q0.7_relative#27,promoted_price_net#28,promoted_price_net_q0.1_relative#29,list_price_net_q0.1#30,list_price_net_q0.5_relative#31,promoted_price_net_q0.3_relative#32,promoted_price_net_q0.5#33,... 9 more fields] csv
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:88)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:116)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:120)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:120)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:125)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:125)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:104)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3295)
at org.apache.spark.sql.Dataset.select(Dataset.scala:1307)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Unknown Source)
During handling of the above exception, another exception occurred:
AnalysisException Traceback (most recent call last)
<ipython-input-7-f7e0007723d8> in <module>()
----> 1 df1 = df.select(*(col(c).cast("float").alias(c) for c in cols_to_numeric))
D:\Spark\python\pyspark\sql\dataframe.py in select(self, *cols)
1200 [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]
1201 """
-> 1202 jdf = self._jdf.select(self._jcols(*cols))
1203 return DataFrame(jdf, self.sql_ctx)
1204
D:\Spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in __call__(self, *args)
1158 answer = self.gateway_client.send_command(command)
1159 return_value = get_return_value(
-> 1160 answer, self.gateway_client, self.target_id, self.name)
1161
1162 for temp_arg in temp_args:
D:\Spark\python\pyspark\sql\utils.py in deco(*a, **kw)
67 e.java_exception.getStackTrace()))
68 if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
70 if s.startswith('org.apache.spark.sql.catalyst.analysis'):
71 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: "cannot resolve '`list_price_net_q0.7`' given input columns: [promoted_price_net_q0.1, promoted_price_net_q0.1_relative, promo_start_week, promoted_price_net_q0.9, discount_rate, promoted_price_net, brand, holiday_names, list_price_net_q0.1, list_price_net_q0.7_relative, revenue, promoted_price_net_q0.7, First_week_of_promo, promoted_price_net_q0.5_relative, promoted_price_net_q0.3_relative, promoted_price_net_q0.5, list_price_net_q0.5, promoted_price_net_q0.9_relative, sku_id, promoted_price_net_q0.3, list_price_net_q0.3, list_price_net_q0.1_relative, hierarchy2_name, store_norm_group, list_price_net_q0.5_relative, list_price_net_q0.9_relative, region, promoted_price_net_q0.7_relative, list_price_net_q0.9, holiday_types, list_price_net_q0.7, list_price_net_q0.3_relative];;\n'Project [cast('list_price_net_q0.7 as float) AS list_price_net_q0.7#109, cast('promoted_price_net_q0.7 as float) AS promoted_price_net_q0.7#110, cast('list_price_net_q0.3_relative as float) AS list_price_net_q0.3_relative#111, cast(discount_rate#22 as float) AS discount_rate#112, cast('promoted_price_net_q0.9 as float) AS promoted_price_net_q0.9#113, cast('list_price_net_q0.3 as float) AS list_price_net_q0.3#114, cast('list_price_net_q0.7_relative as float) AS list_price_net_q0.7_relative#115, cast('promoted_price_net_q0.5_relative as float) AS promoted_price_net_q0.5_relative#116, cast('promoted_price_net_q0.7_relative as float) AS promoted_price_net_q0.7_relative#117, cast(promoted_price_net#28 as float) AS promoted_price_net#118, cast('promoted_price_net_q0.1_relative as float) AS promoted_price_net_q0.1_relative#119, cast('list_price_net_q0.1 as float) AS list_price_net_q0.1#120, cast('list_price_net_q0.5_relative as float) AS list_price_net_q0.5_relative#121, cast('promoted_price_net_q0.3_relative as float) AS promoted_price_net_q0.3_relative#122, cast('promoted_price_net_q0.5 as float) AS promoted_price_net_q0.5#123, cast('list_price_net_q0.5 as float) AS list_price_net_q0.5#124, cast(revenue#35 as float) AS revenue#125, cast('promoted_price_net_q0.3 as float) AS promoted_price_net_q0.3#126, cast('list_price_net_q0.9 as float) AS list_price_net_q0.9#127, cast('list_price_net_q0.1_relative as float) AS list_price_net_q0.1_relative#128, cast('promoted_price_net_q0.9_relative as float) AS promoted_price_net_q0.9_relative#129, cast(First_week_of_promo#40 as float) AS First_week_of_promo#130, cast('list_price_net_q0.9_relative as float) AS list_price_net_q0.9_relative#131, cast('promoted_price_net_q0.1 as float) AS promoted_price_net_q0.1#132]\n+- AnalysisBarrier\n +- Project [sku_id#11, promo_start_week#12, hierarchy2_name#13, brand#14, region#15, store_norm_group#16, holiday_names#17, holiday_types#18, list_price_net_q0.7#19, promoted_price_net_q0.7#20, list_price_net_q0.3_relative#21, discount_rate#22, promoted_price_net_q0.9#23, list_price_net_q0.3#24, list_price_net_q0.7_relative#25, promoted_price_net_q0.5_relative#26, promoted_price_net_q0.7_relative#27, promoted_price_net#28, promoted_price_net_q0.1_relative#29, list_price_net_q0.1#30, list_price_net_q0.5_relative#31, promoted_price_net_q0.3_relative#32, promoted_price_net_q0.5#33, list_price_net_q0.5#34, ... 8 more fields]\n +- Relation[_c0#10,sku_id#11,promo_start_week#12,hierarchy2_name#13,brand#14,region#15,store_norm_group#16,holiday_names#17,holiday_types#18,list_price_net_q0.7#19,promoted_price_net_q0.7#20,list_price_net_q0.3_relative#21,discount_rate#22,promoted_price_net_q0.9#23,list_price_net_q0.3#24,list_price_net_q0.7_relative#25,promoted_price_net_q0.5_relative#26,promoted_price_net_q0.7_relative#27,promoted_price_net#28,promoted_price_net_q0.1_relative#29,list_price_net_q0.1#30,list_price_net_q0.5_relative#31,promoted_price_net_q0.3_relative#32,promoted_price_net_q0.5#33,... 9 more fields] csv\n"
当我尝试时会产生类似的错误:
A similar error is produced when I try:
df = df.withColumn('list_price_net_q0.7', col('list_price_net_q0.7').cast('float'))
推荐答案
Spark cannot resolve column names with dots due to a bug (see this bug report for more information). Just use the code below to clean up your column names:
columns = ['id', 'list_price_net_q0.7', 'bla']
vals = [(1.0, '2.0', 0),(2.0, '3.0', 1)]
df = spark.createDataFrame(vals, columns)
df.printSchema()
#actual clean up
x = [s.replace('.', 'DOT') for s in df.columns]
df = df.toDF(*x)
#prove that you can cast now
df.withColumn("float", col("list_price_net_q0DOT7").cast("float")).show()
Another option is to set the inferSchema parameter to True. This will probably create a dataframe where 'list_price_net_q0.7' is a column of floats, but as soon as you apply another function you will stumple upon the same bug.
这篇关于当我尝试将字符串列转换为数字时,PySpark返回异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!