Pyspark - TypeError:使用reduceByKey计算平均值时,“float"对象不可下标 [英] Pyspark - TypeError: 'float' object is not subscriptable when calculating mean using reduceByKey

查看:11
本文介绍了Pyspark - TypeError:使用reduceByKey计算平均值时,“float"对象不可下标的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的asdasd.csv"文件具有以下结构.

 Index,Arrival_Time,Creation_Time,x,y,z,User,Model,Device,gt0,1424696633908,1424696631913248572,-5.958191,0.6880646,8.135345,a,nexus4,nexus4_1,stand1,1424696633909,1424696631918283972,-5.95224,0.6702118,8.136536,a,nexus4,nexus4_1,stand2,1424696633918,1424696631923288855,-5.9950867,0.6535491999999999,8.204376,a,nexus4,nexus4_1,stand3,1424696633919,1424696631928385290,-5.9427185,0.6761626999999999,8.128204,a,nexus4,nexus4_1,stand

好的,我得到以下 {key,value} 元组来操作它.

# x y z[(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345)))]# A 部分(键) B 部分(值)

我计算均值的代码如下,我必须计算每列的均值,每个键的 X、Y Z.

rdd_ori = sc.textFile("asdasd.csv") .map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5]))))meanRDD = rdd_ori.mapValues(lambda x: (x,1)) .reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1])).mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3]))

我的问题是我尝试了该代码并且它在其他具有相同 MV 的 PC 上运行良好(PySpark Py3)

这是一个例子,这段代码是正确的:

但我不知道为什么会出现这个错误,重要的部分是.

<块引用><块引用>

--------------------------------------------------------------------------- Py4JJavaError 回溯(最近一次调用最后) 在 ()9 #sum_1 = count_.reduceByKey(lambda x, y: (x[0][0]+y[0][0],x0+y0,x[0][2]+y[0][2]))10---> 11 打印(meanRDD.take(1))

/opt/spark/current/python/pyspark/rdd.py in take(self, num) 1341
1342 p = 范围(partsScanned,min(partsScanned +numPartsToTry, totalParts))-> 1343 res = self.context.runJob(self, takeUpToNumLeft, p) 1344 1345 项 += res

/opt/spark/current/python/pyspark/context.py 在 runJob(self, rdd,partitionFunc、分区、allowLocal)第 990 章991 映射RDD = rdd.mapPartitions(partitionFunc)--> 992 端口 = self._jvm.PythonRDD.runJob(self._jsc.sc(),mappedRDD._jrdd, partitions)993返回列表(_load_from_socket(端口,mappedRDD._jrdd_deserializer))994

/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py在调用(self, *args) 1131 answer =self.gateway_client.send_command(command) 1132 return_value= get_return_value(-> 1133 答案,self.gateway_client,self.target_id,self.name) 1134 1135 为 temp_args 中的 temp_arg:

/opt/spark/current/python/pyspark/sql/utils.py 在 deco(*a, **kw)61 def deco(*a, **kw):62 尝试:---> 63 返回 f(*a, **kw)64 除了 py4j.protocol.Py4JJavaError 作为 e:65 s = e.java_exception.toString()

/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py 中get_return_value(answer, gateway_client, target_id, name)第317话318 调用 {0}{1}{2} 时出错. ".--> 319 格式(target_id, ".", name), value)320 其他:321 引发 Py4JError(

Py4JJavaError: 调用时发生错误z:org.apache.spark.api.python.PythonRDD.runJob.:org.apache.spark.SparkException:由于阶段失败,作业中止:阶段 127.0 中的任务 0 失败 1 次,最近失败:丢失任务0.0 在阶段 127.0(TID 102,本地主机,执行程序驱动程序):org.apache.spark.api.python.PythonException:Traceback(最近最后调用):文件/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py",行177,主要进程()文件/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py",行172,处理中serializer.dump_stream(func(split_index, iterator), outfile) 文件/opt/spark/current/python/pyspark/rdd.py",第 2423 行,在管道函数返回 func(split, prev_func(split, iterator)) 文件/opt/spark/current/python/pyspark/rdd.py",第 2423 行,在管道函数返回 func(split, prev_func(split, iterator)) 文件/opt/spark/current/python/pyspark/rdd.py",第 346 行,在 func 中返回 f(iterator) 文件/opt/spark/current/python/pyspark/rdd.py",第 1842 行,在本地结合merge.mergeValues(iterator) 文件/opt/spark/current/python/lib/pyspark.zip/pyspark/shuffle.py",行238,在合并值中d[k] = comb(d[k], v) if k in d else creator(v) File "", line 3, in TypeError:浮动"对象不可下标

解决方案

这里是 reduceByKey 的工作原理.我以您的示例为例进行说明,即您传递给 reduceByKey

的以下数据

# x y z[(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345), 1))]# A 部分(键) B 部分(值)计数器

让我一步一步来

执行以下mapValues函数后

rdd_ori.mapValues(lambda x: (x,1))

rdd 数据 将看起来像

((u'a', u'nexus4', u'stand'), ((-5.9427185, 0.6761626999999999, 8.128204), 1))((u'a', u'nexus4', u'stand'), ((-5.958191, 0.6880646, 8.135345), 1))((u'a', u'nexus4', u'stand'), ((-5.95224, 0.6702118, 8.136536), 1))((u'a', u'nexus4', u'stand'), ((-5.9950867, 0.6535491999999999, 8.204376), 1))

所以当 reduceByKey 被调用为

.reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0]][2] + b[0][2], a[1] + b[1]))

并且将所有具有相同键的行分组并将值传递给reducyByKeylambda函数.

由于在您的情况下,所有键都相同,因此值将在以下迭代中传递给 ab 变量.

在第一次迭代中,a(((-5.9427185, 0.6761626999999999, 8.128204), 1)b((-5.958191, 0.6880646, 8.135345), 1) 所以计算部分(a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]) 正确,通过.

在第二次迭代中,a(a[0][0] + b[0][0], a[0][1] + b[0] 的输出][1], a[0][2] + b[0][2], a[1] + b[1])(-11.910430999999999, 1.3582764, 16.271881, 2)

因此,如果您查看数据的格式,a 中没有这样的 a[0][0].你可以得到 a[0], a[1] .. 等等.所以这就是问题所在.这也是错误消息所暗示的.

<块引用><块引用>

TypeError: 'float' 对象不可下标

对此的解决方案是格式化数据,以便您可以将 a 作为 a[0][0] 访问,如果您格式化就可以完成以下格式的 reduceByKey.

.reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2]), a[1] + b[1]))

但是那会给你最后一个 mapValues 函数带来麻烦

.mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3]))

作为您的价值观,a 在 lambda 函数中,是 ((-23.848236199999995, 2.68798829999999998, 32.604461), 4) 所以 a[0] 表示(-23.848236199999995, 2.6879882999999998, 32.604461)a[1] 表示 4 没有更多了,所以你会遇到

<块引用><块引用>

IndexError:元组索引超出范围

所以你最后的 mapValues 应该是

.mapValues(lambda a : (a[0][0]/a[1], a[0][1]/a[1],a[0][2]/a[1]]))

总的来说,以下代码应该适合你

rdd_ori = sc.textFile("asdasd.csv") .map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5]))))meanRDD = rdd_ori.mapValues(lambda x: (x, 1)) .reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] +b[0][2]), a[1] + b[1])).mapValues(lambda a : (a[0][0]/a[1], a[0][1]/a[1],a[0][2]/a[1]))

我希望我已经解释得够清楚了.

my "asdasd.csv" file has the following structure.

 Index,Arrival_Time,Creation_Time,x,y,z,User,Model,Device,gt
0,1424696633908,1424696631913248572,-5.958191,0.6880646,8.135345,a,nexus4,nexus4_1,stand
1,1424696633909,1424696631918283972,-5.95224,0.6702118,8.136536,a,nexus4,nexus4_1,stand
2,1424696633918,1424696631923288855,-5.9950867,0.6535491999999999,8.204376,a,nexus4,nexus4_1,stand
3,1424696633919,1424696631928385290,-5.9427185,0.6761626999999999,8.128204,a,nexus4,nexus4_1,stand

Ok, I get the following {key,value} tuple to operate with it.

#                                 x           y        z
[(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345)))]
#           part A (key)               part B (value) 

My code for calculating the mean is the following, I have to calculate the mean from each column, X, Y Z for each Key.

rdd_ori = sc.textFile("asdasd.csv") 
        .map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5]))))

meanRDD = rdd_ori.mapValues(lambda x: (x,1)) 
            .reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]))
            .mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3]))

My problem I that I tried that code and it works fine on other PC with the same MV I'm using for developing it (PySpark Py3)

Here is an example, that this code is correct:

But I don't know why I'm getting this error, important part is in Strong.

--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) in () 9 #sum_1 = count_.reduceByKey(lambda x, y: (x[0][0]+y[0][0],x0+y0,x[0][2]+y[0][2])) 10 ---> 11 print(meanRDD.take(1))

/opt/spark/current/python/pyspark/rdd.py in take(self, num) 1341
1342 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) -> 1343 res = self.context.runJob(self, takeUpToNumLeft, p) 1344 1345 items += res

/opt/spark/current/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) 990 # SparkContext#runJob. 991 mappedRDD = rdd.mapPartitions(partitionFunc) --> 992 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) 993 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) 994

/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in call(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 for temp_arg in temp_args:

/opt/spark/current/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString()

/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 317 raise Py4JJavaError( 318 "An error occurred while calling {0}{1}{2}. ". --> 319 format(target_id, ".", name), value) 320 else: 321 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 127.0 failed 1 times, most recent failure: Lost task 0.0 in stage 127.0 (TID 102, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main process() File "/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/opt/spark/current/python/pyspark/rdd.py", line 2423, in pipeline_func return func(split, prev_func(split, iterator)) File "/opt/spark/current/python/pyspark/rdd.py", line 2423, in pipeline_func return func(split, prev_func(split, iterator)) File "/opt/spark/current/python/pyspark/rdd.py", line 346, in func return f(iterator) File "/opt/spark/current/python/pyspark/rdd.py", line 1842, in combineLocally merger.mergeValues(iterator) File "/opt/spark/current/python/lib/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues d[k] = comb(d[k], v) if k in d else creator(v) File "", line 3, in TypeError: 'float' object is not subscriptable

解决方案

Heres how reduceByKey works. I am taking your example for illustration i.e. with following data that you pass to reduceByKey

#                                 x           y        z
[(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345), 1))]
#           part A (key)               part B (value)       counter

Let me go step by step

After performing the following mapValues function

rdd_ori.mapValues(lambda x: (x,1))

the rdd data will look as

((u'a', u'nexus4', u'stand'), ((-5.9427185, 0.6761626999999999, 8.128204), 1))
((u'a', u'nexus4', u'stand'), ((-5.958191, 0.6880646, 8.135345), 1))
((u'a', u'nexus4', u'stand'), ((-5.95224, 0.6702118, 8.136536), 1))
((u'a', u'nexus4', u'stand'), ((-5.9950867, 0.6535491999999999, 8.204376), 1))

So when reduceByKey is invoked as

.reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]))

And all the rows with same key are grouped and values are passed to the lambda function of reducyByKey.

Since in your case, all the keys are same, the values are passed to a and b variables in the following iterations.

In first iteration, a is ((-5.9427185, 0.6761626999999999, 8.128204), 1) and b is ((-5.958191, 0.6880646, 8.135345), 1) so the calculation part (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]) is correct and passes.

In second iteration, a is the output of (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]) which is (-11.910430999999999, 1.3582764, 16.271881, 2)

So if you look at the format of the data there is no such a[0][0] in a. You can just get a[0], a[1] .. and so on. So thats the issue. And thats what the error message is suggesting too.

TypeError: 'float' object is not subscriptable

The solution to this is to format the data so that you can access a as a[0][0] which can be done if you format your reduceByKey of the following format.

.reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2]), a[1] + b[1]))

But that would trouble your last mapValues function

.mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3]))

as your values, i.e. a in lambda function, are of ((-23.848236199999995, 2.6879882999999998, 32.604461), 4) so a[0] means (-23.848236199999995, 2.6879882999999998, 32.604461) and a[1] means 4 and there aren't any more so you will encounter

IndexError: tuple index out of range

So your last mapValues should be

.mapValues(lambda a : (a[0][0]/a[1], a[0][1]/a[1],a[0][2]/a[1]))

So overall, following code should work for you

rdd_ori = sc.textFile("asdasd.csv") 
    .map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5]))))

meanRDD = rdd_ori.mapValues(lambda x: (x, 1)) 
    .reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2]), a[1] + b[1]))
    .mapValues(lambda a : (a[0][0]/a[1], a[0][1]/a[1],a[0][2]/a[1]))

I hope I have explained it well enough.

这篇关于Pyspark - TypeError:使用reduceByKey计算平均值时,“float"对象不可下标的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆