Pyspark - ValueError:无法将字符串转换为浮点数/float() 的无效文字 [英] Pyspark - ValueError: could not convert string to float / invalid literal for float()

查看:39
本文介绍了Pyspark - ValueError:无法将字符串转换为浮点数/float() 的无效文字的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用来自 spark 数据帧的数据作为 k-means 模型的输入.但是我不断收到错误.(检查代码后的部分)

I am trying to use data from a spark dataframe as the input for my k-means model. However I keep getting errors. (Check section after code)

我的 spark 数据框看起来像这样(大约有 100 万行):

My spark dataframe and looks like this (and has around 1M rows):

ID            col1           col2        Latitude         Longitude
13            ...            ...           22.2             13.5
62            ...            ...           21.4             13.8
24            ...            ...           21.8             14.1
71            ...            ...           28.9             18.0
...           ...            ...           ....             ....

这是我的代码:

from pyspark.ml.clustering import KMeans 
from pyspark.ml.linalg import Vectors


df = spark.read.csv("file.csv")

spark_rdd = df.rdd.map(lambda row: (row["ID"], Vectors.dense(row["Latitude"],row["Longitude"])))
feature_df = spark_rdd.toDF(["ID", "features"])    

kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(feature_df)

sum_of_square_error = model.computeCost(feature_df)
    print str(sum_of_square_error)

centers = model.clusterCenters()

for center in centers:
    print(center)

但是,我收到错误:

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-145-f50a6cbe7243> in <module>()
      7 
      8 kmeans = KMeans().setK(2).setSeed(1)
----> 9 model = kmeans.fit(feature_df)
     10 
     11 

~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/base.py in fit(self, dataset, params)
     62                 return self.copy(params)._fit(dataset)
     63             else:
---> 64                 return self._fit(dataset)
     65         else:
     66             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/wrapper.py in _fit(self, dataset)
    234 
    235     def _fit(self, dataset):
--> 236         java_model = self._fit_java(dataset)
    237         return self._create_model(java_model)
    238 

~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
    231         """
    232         self._transfer_params_to_java()
--> 233         return self._java_obj.fit(dataset._jdf)
    234 
    235     def _fit(self, dataset):

/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling o3552.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 457.0 failed 4 times, most recent failure: Lost task 5.3 in stage 457.0 (TID 2308, 10.3.1.31, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 174, in main
    process()
  File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-145-f50a6cbe7243>", line 4, in <lambda>
  File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/linalg/__init__.py", line 790, in dense
    return DenseVector(elements)
  File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/linalg/__init__.py", line 275, in __init__
    ar = np.array(ar, dtype=np.float64)
ValueError: could not convert string to float: GOLF


    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:156)
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:152)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
    at org.apache.spark.rdd.RDD.count(RDD.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$takeSample$1.apply(RDD.scala:567)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.takeSample(RDD.scala:556)
    at org.apache.spark.mllib.clustering.KMeans.initKMeansParallel(KMeans.scala:353)
    at org.apache.spark.mllib.clustering.KMeans.runAlgorithm(KMeans.scala:256)
    at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:227)
    at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:319)
    at sun.reflect.GeneratedMethodAccessor89.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 174, in main
    process()
  File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-145-f50a6cbe7243>", line 4, in <lambda>
  File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/linalg/__init__.py", line 790, in dense
    return DenseVector(elements)
  File "~/Downloads/spark-2.1.0-bin-hadoop2.7/python/pyspark/ml/linalg/__init__.py", line 275, in __init__
    ar = np.array(ar, dtype=np.float64)
ValueError: could not convert string to float: GOLFE 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:156)
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:152)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more

奇怪的是每次运行的错误都不一样.我得到的 3 种类型的错误是:

The strange thing is that the error is different every time I run it. The 3 types of errors I get are:

UnicodeEncodeError: 'decimal' 编解码器无法编码位置 3-5 的字符:无效的十进制 Unicode 字符串

float() 的文字无效:2017-04

ValueError: 无法将字符串转换为浮点数:GOLF

如果我错了,请纠正我,但我认为列中的某些数据值可能不正确(例如,纬度和经度列中偶尔会出现字符串)

Correct me if I am wrong, but I think some value of data in the columns may be incorrect (eg. occasional strings inside latitude and longitude column)

有没有办法检查纬度"每一行中的值是否实际上是一个浮点数?有没有办法检查ID"每一行中的值是否为整数?我想丢弃包含错误数据类型值的行.也许有一种方法可以使用 df.filter() 来做到这一点?

Is there a way to check if the value in each row of 'Latitude' is in fact a float? Is there a way to check if the value in each row of 'ID' is an integer? I would like to discard the rows which contain values of the incorrect data type. Perhaps there a way of doing this using df.filter()?

我将不胜感激任何帮助.谢谢.

I would greatly appreciate any help. Thanks.

更新:我什至尝试过 df.describe('ID', 'Latitude', 'Longitude').show() 它返回计数、平均值、标准偏差、最小值、最大值的数值每列的值,向我表明它们必须都是数字..?

UPDATE: I even tried df.describe('ID', 'Latitude', 'Longitude').show() and it returns numeric values for count, mean, stddev, min, max values for each column, indicating to me that they must all be numbers..?

推荐答案

你可能应该继续在同一个线程上,因为它是同样的问题.供参考:在pyspark中预处理数据

you should maybe have continued on the same thread since it's the same problem. For reference : Preprocessing data in pyspark

这里需要将Latitude/Longitude 转换为float 并使用dropna 去除空值,然后再将数据注入到Kmean 中,因为它似乎这些列包含一些无法转换为数值的字符串,因此使用以下内容预处理 df:

Here you need to convert Latitude / Longitude to float and remove null values with dropna before injecting the data in Kmean, because it seems these columns contain some strings that cannot be cast to a numeric value, so preprocess df with something like :

df2 = (df
       .withColumn("Latitude", col("Latitude").cast("float"))
       .withColumn("Longitude", col("Longitude").cast("float"))
       .dropna()
       )

spark_rdd = df2.rdd ...

这篇关于Pyspark - ValueError:无法将字符串转换为浮点数/float() 的无效文字的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆