当我尝试将字符串列转换为数字时,PySpark返回异常 [英] PySpark returns an exception when I try to cast string columns as numeric

查看:74
本文介绍了当我尝试将字符串列转换为数字时,PySpark返回异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图将字符串列转换为数字,但在PySpark中却遇到异常.我在下面提供了代码和错误消息.

I'm trying to cast string columns to numeric, but I am getting an exception in PySpark. I provide below the code and the error message.

是否可以将csv文件中的特定列作为数字导入?(默认值将作为字符串导入).

Is it possible to import the specific columns from the csv file as numeric? (the default is to be imported as strings).

我还有什么选择?

我的代码和错误消息如下:

My code and the error messages follow below:

import pandas as pd

import seaborn as sns

import findspark

findspark.init()

import pyspark

from pyspark.sql import SparkSession

# Loads data. Be careful of indentations and whitespace
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.master('local') \
.appName('Data cleaning') \
.getOrCreate()

# These lines enable the run of spark commands

from pyspark.context import SparkContext
#from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()

#spark = SparkSession(sc)

import os
os.chdir('D:\\DIGITAL_LIBRARY\\DataCamp')

df = spark.read.format('csv').option('header','true').option('mode','DROPMALFORMED').\
load('D:\DIGITAL_LIBRARY\DataCamp\\df.csv')

from pyspark.sql.functions import *

df.columns
['sku_id',
 'promo_start_week',
 'hierarchy2_name',
 'brand',
 'region',
 'store_norm_group',
 'holiday_names',
 'holiday_types',
 'list_price_net_q0.7',
 'promoted_price_net_q0.7',
 'list_price_net_q0.3_relative',
 'discount_rate',
 'promoted_price_net_q0.9',
 'list_price_net_q0.3',
 'list_price_net_q0.7_relative',
 'promoted_price_net_q0.5_relative',
 'promoted_price_net_q0.7_relative',
 'promoted_price_net',
 'promoted_price_net_q0.1_relative',
 'list_price_net_q0.1',
 'list_price_net_q0.5_relative',
 'promoted_price_net_q0.3_relative',
 'promoted_price_net_q0.5',
 'list_price_net_q0.5',
 'revenue',
 'promoted_price_net_q0.3',
 'list_price_net_q0.9',
 'list_price_net_q0.1_relative',
 'promoted_price_net_q0.9_relative',
 'First_week_of_promo',
 'list_price_net_q0.9_relative',
 'promoted_price_net_q0.1']

cols_to_numeric = ['list_price_net_q0.7',
 'promoted_price_net_q0.7',
 'list_price_net_q0.3_relative',
 'discount_rate',
 'promoted_price_net_q0.9',
 'list_price_net_q0.3',
 'list_price_net_q0.7_relative',
 'promoted_price_net_q0.5_relative',
 'promoted_price_net_q0.7_relative',
 'promoted_price_net',
 'promoted_price_net_q0.1_relative',
 'list_price_net_q0.1',
 'list_price_net_q0.5_relative',
 'promoted_price_net_q0.3_relative',
 'promoted_price_net_q0.5',
 'list_price_net_q0.5',
 'revenue',
 'promoted_price_net_q0.3',
 'list_price_net_q0.9',
 'list_price_net_q0.1_relative',
 'promoted_price_net_q0.9_relative',
 'First_week_of_promo',
 'list_price_net_q0.9_relative',
 'promoted_price_net_q0.1']

df1 = df.select(*(col(c).cast("float").alias(c) for c in cols_to_numeric))
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
D:\Spark\python\pyspark\sql\utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

D:\Spark\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    319                     "An error occurred while calling {0}{1}{2}.\n".
--> 320                     format(target_id, ".", name), value)
    321             else:

Py4JJavaError: An error occurred while calling o36.select.
: org.apache.spark.sql.AnalysisException: cannot resolve '`list_price_net_q0.7`' given input columns: [promoted_price_net_q0.1, promoted_price_net_q0.1_relative, promo_start_week, promoted_price_net_q0.9, discount_rate, promoted_price_net, brand, holiday_names, list_price_net_q0.1, list_price_net_q0.7_relative, revenue, promoted_price_net_q0.7, First_week_of_promo, promoted_price_net_q0.5_relative, promoted_price_net_q0.3_relative, promoted_price_net_q0.5, list_price_net_q0.5, promoted_price_net_q0.9_relative, sku_id, promoted_price_net_q0.3, list_price_net_q0.3, list_price_net_q0.1_relative, hierarchy2_name, store_norm_group, list_price_net_q0.5_relative, list_price_net_q0.9_relative, region, promoted_price_net_q0.7_relative, list_price_net_q0.9, holiday_types, list_price_net_q0.7, list_price_net_q0.3_relative];;
'Project [cast('list_price_net_q0.7 as float) AS list_price_net_q0.7#109, cast('promoted_price_net_q0.7 as float) AS promoted_price_net_q0.7#110, cast('list_price_net_q0.3_relative as float) AS list_price_net_q0.3_relative#111, cast(discount_rate#22 as float) AS discount_rate#112, cast('promoted_price_net_q0.9 as float) AS promoted_price_net_q0.9#113, cast('list_price_net_q0.3 as float) AS list_price_net_q0.3#114, cast('list_price_net_q0.7_relative as float) AS list_price_net_q0.7_relative#115, cast('promoted_price_net_q0.5_relative as float) AS promoted_price_net_q0.5_relative#116, cast('promoted_price_net_q0.7_relative as float) AS promoted_price_net_q0.7_relative#117, cast(promoted_price_net#28 as float) AS promoted_price_net#118, cast('promoted_price_net_q0.1_relative as float) AS promoted_price_net_q0.1_relative#119, cast('list_price_net_q0.1 as float) AS list_price_net_q0.1#120, cast('list_price_net_q0.5_relative as float) AS list_price_net_q0.5_relative#121, cast('promoted_price_net_q0.3_relative as float) AS promoted_price_net_q0.3_relative#122, cast('promoted_price_net_q0.5 as float) AS promoted_price_net_q0.5#123, cast('list_price_net_q0.5 as float) AS list_price_net_q0.5#124, cast(revenue#35 as float) AS revenue#125, cast('promoted_price_net_q0.3 as float) AS promoted_price_net_q0.3#126, cast('list_price_net_q0.9 as float) AS list_price_net_q0.9#127, cast('list_price_net_q0.1_relative as float) AS list_price_net_q0.1_relative#128, cast('promoted_price_net_q0.9_relative as float) AS promoted_price_net_q0.9_relative#129, cast(First_week_of_promo#40 as float) AS First_week_of_promo#130, cast('list_price_net_q0.9_relative as float) AS list_price_net_q0.9_relative#131, cast('promoted_price_net_q0.1 as float) AS promoted_price_net_q0.1#132]
+- AnalysisBarrier
      +- Project [sku_id#11, promo_start_week#12, hierarchy2_name#13, brand#14, region#15, store_norm_group#16, holiday_names#17, holiday_types#18, list_price_net_q0.7#19, promoted_price_net_q0.7#20, list_price_net_q0.3_relative#21, discount_rate#22, promoted_price_net_q0.9#23, list_price_net_q0.3#24, list_price_net_q0.7_relative#25, promoted_price_net_q0.5_relative#26, promoted_price_net_q0.7_relative#27, promoted_price_net#28, promoted_price_net_q0.1_relative#29, list_price_net_q0.1#30, list_price_net_q0.5_relative#31, promoted_price_net_q0.3_relative#32, promoted_price_net_q0.5#33, list_price_net_q0.5#34, ... 8 more fields]
         +- Relation[_c0#10,sku_id#11,promo_start_week#12,hierarchy2_name#13,brand#14,region#15,store_norm_group#16,holiday_names#17,holiday_types#18,list_price_net_q0.7#19,promoted_price_net_q0.7#20,list_price_net_q0.3_relative#21,discount_rate#22,promoted_price_net_q0.9#23,list_price_net_q0.3#24,list_price_net_q0.7_relative#25,promoted_price_net_q0.5_relative#26,promoted_price_net_q0.7_relative#27,promoted_price_net#28,promoted_price_net_q0.1_relative#29,list_price_net_q0.1#30,list_price_net_q0.5_relative#31,promoted_price_net_q0.3_relative#32,promoted_price_net_q0.5#33,... 9 more fields] csv

    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:88)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:116)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:120)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:120)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:125)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:125)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:104)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3295)
    at org.apache.spark.sql.Dataset.select(Dataset.scala:1307)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Unknown Source)


During handling of the above exception, another exception occurred:

AnalysisException                         Traceback (most recent call last)
<ipython-input-7-f7e0007723d8> in <module>()
----> 1 df1 = df.select(*(col(c).cast("float").alias(c) for c in cols_to_numeric))

D:\Spark\python\pyspark\sql\dataframe.py in select(self, *cols)
   1200         [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]
   1201         """
-> 1202         jdf = self._jdf.select(self._jcols(*cols))
   1203         return DataFrame(jdf, self.sql_ctx)
   1204 

D:\Spark\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in __call__(self, *args)
   1158         answer = self.gateway_client.send_command(command)
   1159         return_value = get_return_value(
-> 1160             answer, self.gateway_client, self.target_id, self.name)
   1161 
   1162         for temp_arg in temp_args:

D:\Spark\python\pyspark\sql\utils.py in deco(*a, **kw)
     67                                              e.java_exception.getStackTrace()))
     68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
     71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: "cannot resolve '`list_price_net_q0.7`' given input columns: [promoted_price_net_q0.1, promoted_price_net_q0.1_relative, promo_start_week, promoted_price_net_q0.9, discount_rate, promoted_price_net, brand, holiday_names, list_price_net_q0.1, list_price_net_q0.7_relative, revenue, promoted_price_net_q0.7, First_week_of_promo, promoted_price_net_q0.5_relative, promoted_price_net_q0.3_relative, promoted_price_net_q0.5, list_price_net_q0.5, promoted_price_net_q0.9_relative, sku_id, promoted_price_net_q0.3, list_price_net_q0.3, list_price_net_q0.1_relative, hierarchy2_name, store_norm_group, list_price_net_q0.5_relative, list_price_net_q0.9_relative, region, promoted_price_net_q0.7_relative, list_price_net_q0.9, holiday_types, list_price_net_q0.7, list_price_net_q0.3_relative];;\n'Project [cast('list_price_net_q0.7 as float) AS list_price_net_q0.7#109, cast('promoted_price_net_q0.7 as float) AS promoted_price_net_q0.7#110, cast('list_price_net_q0.3_relative as float) AS list_price_net_q0.3_relative#111, cast(discount_rate#22 as float) AS discount_rate#112, cast('promoted_price_net_q0.9 as float) AS promoted_price_net_q0.9#113, cast('list_price_net_q0.3 as float) AS list_price_net_q0.3#114, cast('list_price_net_q0.7_relative as float) AS list_price_net_q0.7_relative#115, cast('promoted_price_net_q0.5_relative as float) AS promoted_price_net_q0.5_relative#116, cast('promoted_price_net_q0.7_relative as float) AS promoted_price_net_q0.7_relative#117, cast(promoted_price_net#28 as float) AS promoted_price_net#118, cast('promoted_price_net_q0.1_relative as float) AS promoted_price_net_q0.1_relative#119, cast('list_price_net_q0.1 as float) AS list_price_net_q0.1#120, cast('list_price_net_q0.5_relative as float) AS list_price_net_q0.5_relative#121, cast('promoted_price_net_q0.3_relative as float) AS promoted_price_net_q0.3_relative#122, cast('promoted_price_net_q0.5 as float) AS promoted_price_net_q0.5#123, cast('list_price_net_q0.5 as float) AS list_price_net_q0.5#124, cast(revenue#35 as float) AS revenue#125, cast('promoted_price_net_q0.3 as float) AS promoted_price_net_q0.3#126, cast('list_price_net_q0.9 as float) AS list_price_net_q0.9#127, cast('list_price_net_q0.1_relative as float) AS list_price_net_q0.1_relative#128, cast('promoted_price_net_q0.9_relative as float) AS promoted_price_net_q0.9_relative#129, cast(First_week_of_promo#40 as float) AS First_week_of_promo#130, cast('list_price_net_q0.9_relative as float) AS list_price_net_q0.9_relative#131, cast('promoted_price_net_q0.1 as float) AS promoted_price_net_q0.1#132]\n+- AnalysisBarrier\n      +- Project [sku_id#11, promo_start_week#12, hierarchy2_name#13, brand#14, region#15, store_norm_group#16, holiday_names#17, holiday_types#18, list_price_net_q0.7#19, promoted_price_net_q0.7#20, list_price_net_q0.3_relative#21, discount_rate#22, promoted_price_net_q0.9#23, list_price_net_q0.3#24, list_price_net_q0.7_relative#25, promoted_price_net_q0.5_relative#26, promoted_price_net_q0.7_relative#27, promoted_price_net#28, promoted_price_net_q0.1_relative#29, list_price_net_q0.1#30, list_price_net_q0.5_relative#31, promoted_price_net_q0.3_relative#32, promoted_price_net_q0.5#33, list_price_net_q0.5#34, ... 8 more fields]\n         +- Relation[_c0#10,sku_id#11,promo_start_week#12,hierarchy2_name#13,brand#14,region#15,store_norm_group#16,holiday_names#17,holiday_types#18,list_price_net_q0.7#19,promoted_price_net_q0.7#20,list_price_net_q0.3_relative#21,discount_rate#22,promoted_price_net_q0.9#23,list_price_net_q0.3#24,list_price_net_q0.7_relative#25,promoted_price_net_q0.5_relative#26,promoted_price_net_q0.7_relative#27,promoted_price_net#28,promoted_price_net_q0.1_relative#29,list_price_net_q0.1#30,list_price_net_q0.5_relative#31,promoted_price_net_q0.3_relative#32,promoted_price_net_q0.5#33,... 9 more fields] csv\n"

当我尝试时会产生类似的错误:

A similar error is produced when I try:

df = df.withColumn('list_price_net_q0.7', col('list_price_net_q0.7').cast('float'))

推荐答案

由于错误,Spark无法解析带点的列名(请参见

Spark cannot resolve column names with dots due to a bug (see this bug report for more information). Just use the code below to clean up your column names:

columns = ['id', 'list_price_net_q0.7', 'bla']
vals = [(1.0, '2.0', 0),(2.0, '3.0', 1)]

df = spark.createDataFrame(vals, columns)
df.printSchema()
#actual clean up
x = [s.replace('.', 'DOT') for s in df.columns]
df = df.toDF(*x)
#prove that you can cast now
df.withColumn("float", col("list_price_net_q0DOT7").cast("float")).show()

另一种选择是设置

Another option is to set the inferSchema parameter to True. This will probably create a dataframe where 'list_price_net_q0.7' is a column of floats, but as soon as you apply another function you will stumple upon the same bug.

这篇关于当我尝试将字符串列转换为数字时,PySpark返回异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆