Pyspark-TypeError:使用reduceByKey计算均值时,'float'对象不可下标 [英] Pyspark - TypeError: 'float' object is not subscriptable when calculating mean using reduceByKey

查看:103
本文介绍了Pyspark-TypeError:使用reduceByKey计算均值时,'float'对象不可下标的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的"asdasd.csv"文件具有以下结构.

my "asdasd.csv" file has the following structure.

 Index,Arrival_Time,Creation_Time,x,y,z,User,Model,Device,gt
0,1424696633908,1424696631913248572,-5.958191,0.6880646,8.135345,a,nexus4,nexus4_1,stand
1,1424696633909,1424696631918283972,-5.95224,0.6702118,8.136536,a,nexus4,nexus4_1,stand
2,1424696633918,1424696631923288855,-5.9950867,0.6535491999999999,8.204376,a,nexus4,nexus4_1,stand
3,1424696633919,1424696631928385290,-5.9427185,0.6761626999999999,8.128204,a,nexus4,nexus4_1,stand

好吧,我得到以下{key,value}元组以对其进行操作.

Ok, I get the following {key,value} tuple to operate with it.

#                                 x           y        z
[(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345)))]
#           part A (key)               part B (value) 

我的计算均值的代码如下,我必须从每个列中计算每个键的均值X,Y Z.

My code for calculating the mean is the following, I have to calculate the mean from each column, X, Y Z for each Key.

rdd_ori = sc.textFile("asdasd.csv") \
        .map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5]))))

meanRDD = rdd_ori.mapValues(lambda x: (x,1)) \
            .reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]))\
            .mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3]))

我的问题是我尝试了该代码,并且在其他用于开发它的MV(PySpark Py3)的PC上也可以正常工作

My problem I that I tried that code and it works fine on other PC with the same MV I'm using for developing it (PySpark Py3)

下面是一个示例,此代码正确:

Here is an example, that this code is correct:

但是我不知道为什么会出现此错误,重要的部分是强大.

But I don't know why I'm getting this error, important part is in Strong.

--------------------------------------------------- ---------------------------- Py4JJavaError Traceback(最近的调用 最后)在() 9#sum_1 = count_.reduceByKey(lambda x,y:(x [0] [0] + y [0] [0],x 0 , x [0] [2] + y [0] [2])) 10 ---> 11次打印(meanRDD.take(1))

--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) in () 9 #sum_1 = count_.reduceByKey(lambda x, y: (x[0][0]+y[0][0],x0+y0,x[0][2]+y[0][2])) 10 ---> 11 print(meanRDD.take(1))

/opt/spark/current/python/pyspark/rdd.py in take(自身,num)1341
1342 p =范围(扫描的部分,分钟(扫描的部分+ numPartsToTry,totalParts)) -> 1343 res = self.context.runJob(self,takeUpToNumLeft,p)1344 1345个项+ = res

/opt/spark/current/python/pyspark/rdd.py in take(self, num) 1341
1342 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) -> 1343 res = self.context.runJob(self, takeUpToNumLeft, p) 1344 1345 items += res

/opt/spark/current/python/pyspark/context.py在runJob(self,rdd, partitionFunc,分区,allowLocal) 990#SparkContext#runJob. 第991章 -> 992端口= self._jvm.PythonRDD.runJob(self._jsc.sc(),mappedRDD._jrdd,分区) 993返回列表(_load_from_socket(端口,被映射的RDD._jrdd_deserializer)) 994

/opt/spark/current/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) 990 # SparkContext#runJob. 991 mappedRDD = rdd.mapPartitions(partitionFunc) --> 992 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) 993 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) 994

/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py 在呼叫((* args)自己)1131中,答案= self.gateway_client.send_command(command)1132 return_value = get_return_value( -> 1133答案,self.gateway_client,self.target_id,self.name)1134 1135,用于temp_args中的temp_arg:

/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in call(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 for temp_arg in temp_args:

/opt/spark/current/python/pyspark/sql/utils.py in deco(* a,** kw) 61 def deco(* a,** kw): 62试试: ---> 63返回f(* a,** kw) 64,除了py4j.protocol.Py4JJavaError如e: 65 s = e.java_exception.toString()

/opt/spark/current/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString()

/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py在 get_return_value(答案,gateway_client,target_id,名称) 第317章; 318调用{0} {1} {2}时发生错误.\ n". -> 319格式(target_id,.",名称),值) 第320章 321提高Py4JError(

/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 317 raise Py4JJavaError( 318 "An error occurred while calling {0}{1}{2}.\n". --> 319 format(target_id, ".", name), value) 320 else: 321 raise Py4JError(

Py4JJavaError:调用时发生错误 z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException:由于阶段失败,作业中止了: 阶段127.0中的任务0失败1次,最近一次失败:丢失的任务 在阶段127.0中为0.0(TID 102,本地主机,执行程序驱动程序):org.apache.spark.api.python.Python.PythonException:追溯(最新 最后调用):文件 "/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py",行 177,主要 process()文件"/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py",行 172,进行中 serializer.dump_stream(func(split_index,iterator),outfile)文件"/opt/spark/current/python/pyspark/rdd.py",第2423行,在 pipeline_func 返回func(split,prev_func(split,iterator))文件"/opt/spark/current/python/pyspark/rdd.py",第2423行,在 pipeline_func 在func中返回函数func(split,prev_func(split,iterator))文件"/opt/spark/current/python/pyspark/rdd.py",第346行 返回f(iterator)文件"/opt/spark/current/python/pyspark/rdd.py",行1842,在 在本地合并 merge.mergeValues(iterator)文件"/opt/spark/current/python/lib/pyspark.zip/pyspark/shuffle.py",行 238,在mergeValues中 d [k] = comb(d [k],v),如果k在d造物主(v)File"中,第3行,在TypeError中: 浮动"对象不可下标

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 127.0 failed 1 times, most recent failure: Lost task 0.0 in stage 127.0 (TID 102, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main process() File "/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/opt/spark/current/python/pyspark/rdd.py", line 2423, in pipeline_func return func(split, prev_func(split, iterator)) File "/opt/spark/current/python/pyspark/rdd.py", line 2423, in pipeline_func return func(split, prev_func(split, iterator)) File "/opt/spark/current/python/pyspark/rdd.py", line 346, in func return f(iterator) File "/opt/spark/current/python/pyspark/rdd.py", line 1842, in combineLocally merger.mergeValues(iterator) File "/opt/spark/current/python/lib/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues d[k] = comb(d[k], v) if k in d else creator(v) File "", line 3, in TypeError: 'float' object is not subscriptable

推荐答案

此处说明reduceByKey的工作方式.我以您的示例为例,即将您传递给reduceByKey

Heres how reduceByKey works. I am taking your example for illustration i.e. with following data that you pass to reduceByKey

#                                 x           y        z
[(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345), 1))]
#           part A (key)               part B (value)       counter

让我一步一步走

执行以下mapValues功能

rdd_ori.mapValues(lambda x: (x,1))

rdd数据将显示为

((u'a', u'nexus4', u'stand'), ((-5.9427185, 0.6761626999999999, 8.128204), 1))
((u'a', u'nexus4', u'stand'), ((-5.958191, 0.6880646, 8.135345), 1))
((u'a', u'nexus4', u'stand'), ((-5.95224, 0.6702118, 8.136536), 1))
((u'a', u'nexus4', u'stand'), ((-5.9950867, 0.6535491999999999, 8.204376), 1))

因此,当reduceByKey作为

.reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]))

并且所有具有相同键的行都被分组,并将值传递给reducyByKey lambda函数.

由于您的情况,所有键都是相同的,因此在以下迭代中将值传递给ab变量.

Since in your case, all the keys are same, the values are passed to a and b variables in the following iterations.

在第一次迭代中,a((-5.9427185, 0.6761626999999999, 8.128204), 1)b((-5.958191, 0.6880646, 8.135345), 1),因此计算部分(a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1])是正确的并且可以通过.

In first iteration, a is ((-5.9427185, 0.6761626999999999, 8.128204), 1) and b is ((-5.958191, 0.6880646, 8.135345), 1) so the calculation part (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]) is correct and passes.

在第二次迭代中,a(a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1])的输出,它是(-11.910430999999999, 1.3582764, 16.271881, 2)

In second iteration, a is the output of (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]) which is (-11.910430999999999, 1.3582764, 16.271881, 2)

因此,如果您查看数据格式,则a中没有这样的a[0][0].您可以只获取a[0]a[1] ..等.这就是问题所在.那也是错误消息所暗示的意思.

So if you look at the format of the data there is no such a[0][0] in a. You can just get a[0], a[1] .. and so on. So thats the issue. And thats what the error message is suggesting too.

TypeError:浮动"对象不可下标

TypeError: 'float' object is not subscriptable

解决方案是格式化数据,以便您可以将a作为a[0][0]进行访问,如果您格式化以下格式的reduceByKey,则可以完成此操作.

The solution to this is to format the data so that you can access a as a[0][0] which can be done if you format your reduceByKey of the following format.

.reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2]), a[1] + b[1]))

但这会困扰您的最后一个mapValues函数

But that would trouble your last mapValues function

.mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3]))

作为您的价值观,即Lambda函数中的a ((-23.848236199999995, 2.6879882999999998, 32.604461), 4),因此a[0]表示(-23.848236199999995, 2.6879882999999998, 32.604461),而a[1]表示4,并且没有更多了,因此您会遇到

as your values, i.e. a in lambda function, are of ((-23.848236199999995, 2.6879882999999998, 32.604461), 4) so a[0] means (-23.848236199999995, 2.6879882999999998, 32.604461) and a[1] means 4 and there aren't any more so you will encounter

IndexError:元组索引超出范围

IndexError: tuple index out of range

所以您的最后一个mapValues应该是

So your last mapValues should be

.mapValues(lambda a : (a[0][0]/a[1], a[0][1]/a[1],a[0][2]/a[1]))

因此,总的来说,以下代码应该对您有用

rdd_ori = sc.textFile("asdasd.csv") \
    .map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5]))))

meanRDD = rdd_ori.mapValues(lambda x: (x, 1)) \
    .reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2]), a[1] + b[1]))\
    .mapValues(lambda a : (a[0][0]/a[1], a[0][1]/a[1],a[0][2]/a[1]))

我希望我已经解释清楚了.

I hope I have explained it well enough.

这篇关于Pyspark-TypeError:使用reduceByKey计算均值时,'float'对象不可下标的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆