Pyspark - TypeError:使用reduceByKey计算平均值时,“float"对象不可下标 [英] Pyspark - TypeError: 'float' object is not subscriptable when calculating mean using reduceByKey
问题描述
我的asdasd.csv"文件具有以下结构.
Index,Arrival_Time,Creation_Time,x,y,z,User,Model,Device,gt0,1424696633908,1424696631913248572,-5.958191,0.6880646,8.135345,a,nexus4,nexus4_1,stand1,1424696633909,1424696631918283972,-5.95224,0.6702118,8.136536,a,nexus4,nexus4_1,stand2,1424696633918,1424696631923288855,-5.9950867,0.6535491999999999,8.204376,a,nexus4,nexus4_1,stand3,1424696633919,1424696631928385290,-5.9427185,0.6761626999999999,8.128204,a,nexus4,nexus4_1,stand
好的,我得到以下 {key,value} 元组来操作它.
# x y z[(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345)))]# A 部分(键) B 部分(值)
我计算均值的代码如下,我必须计算每列的均值,每个键的 X、Y Z.
rdd_ori = sc.textFile("asdasd.csv") .map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5]))))meanRDD = rdd_ori.mapValues(lambda x: (x,1)) .reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1])).mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3]))
我的问题是我尝试了该代码并且它在其他具有相同 MV 的 PC 上运行良好(PySpark Py3)
这是一个例子,这段代码是正确的:
但我不知道为什么会出现这个错误,重要的部分是强.
<块引用><块引用>--------------------------------------------------------------------------- Py4JJavaError 回溯(最近一次调用最后) 在 ()9 #sum_1 = count_.reduceByKey(lambda x, y: (x[0][0]+y[0][0],x0+y0,x[0][2]+y[0][2]))10---> 11 打印(meanRDD.take(1))
/opt/spark/current/python/pyspark/rdd.py in take(self, num) 1341
1342 p = 范围(partsScanned,min(partsScanned +numPartsToTry, totalParts))-> 1343 res = self.context.runJob(self, takeUpToNumLeft, p) 1344 1345 项 += res
/opt/spark/current/python/pyspark/context.py 在 runJob(self, rdd,partitionFunc、分区、allowLocal)第 990 章991 映射RDD = rdd.mapPartitions(partitionFunc)--> 992 端口 = self._jvm.PythonRDD.runJob(self._jsc.sc(),mappedRDD._jrdd, partitions)993返回列表(_load_from_socket(端口,mappedRDD._jrdd_deserializer))994
/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py在调用(self, *args) 1131 answer =self.gateway_client.send_command(command) 1132 return_value= get_return_value(-> 1133 答案,self.gateway_client,self.target_id,self.name) 1134 1135 为 temp_args 中的 temp_arg:
/opt/spark/current/python/pyspark/sql/utils.py 在 deco(*a, **kw)61 def deco(*a, **kw):62 尝试:---> 63 返回 f(*a, **kw)64 除了 py4j.protocol.Py4JJavaError 作为 e:65 s = e.java_exception.toString()
/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py 中get_return_value(answer, gateway_client, target_id, name)第317话318 调用 {0}{1}{2} 时出错. ".--> 319 格式(target_id, ".", name), value)320 其他:321 引发 Py4JError(
Py4JJavaError: 调用时发生错误z:org.apache.spark.api.python.PythonRDD.runJob.:org.apache.spark.SparkException:由于阶段失败,作业中止:阶段 127.0 中的任务 0 失败 1 次,最近失败:丢失任务0.0 在阶段 127.0(TID 102,本地主机,执行程序驱动程序):org.apache.spark.api.python.PythonException:Traceback(最近最后调用):文件/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py",行177,主要进程()文件/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py",行172,处理中serializer.dump_stream(func(split_index, iterator), outfile) 文件/opt/spark/current/python/pyspark/rdd.py",第 2423 行,在管道函数返回 func(split, prev_func(split, iterator)) 文件/opt/spark/current/python/pyspark/rdd.py",第 2423 行,在管道函数返回 func(split, prev_func(split, iterator)) 文件/opt/spark/current/python/pyspark/rdd.py",第 346 行,在 func 中返回 f(iterator) 文件/opt/spark/current/python/pyspark/rdd.py",第 1842 行,在本地结合merge.mergeValues(iterator) 文件/opt/spark/current/python/lib/pyspark.zip/pyspark/shuffle.py",行238,在合并值中d[k] = comb(d[k], v) if k in d else creator(v) File "", line 3, in TypeError:浮动"对象不可下标
这里是 reduceByKey
的工作原理.我以您的示例为例进行说明,即您传递给 reduceByKey
# x y z[(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345), 1))]# A 部分(键) B 部分(值)计数器
让我一步一步来
执行以下mapValues
函数后
rdd_ori.mapValues(lambda x: (x,1))
rdd 数据 将看起来像
((u'a', u'nexus4', u'stand'), ((-5.9427185, 0.6761626999999999, 8.128204), 1))((u'a', u'nexus4', u'stand'), ((-5.958191, 0.6880646, 8.135345), 1))((u'a', u'nexus4', u'stand'), ((-5.95224, 0.6702118, 8.136536), 1))((u'a', u'nexus4', u'stand'), ((-5.9950867, 0.6535491999999999, 8.204376), 1))
所以当 reduceByKey
被调用为
.reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0]][2] + b[0][2], a[1] + b[1]))
并且将所有具有相同键的行分组并将值传递给reducyByKey
的lambda
函数.
由于在您的情况下,所有键都相同,因此值将在以下迭代中传递给 a
和 b
变量.
在第一次迭代中,a
是 (((-5.9427185, 0.6761626999999999, 8.128204), 1)
而 b
是 ((-5.958191, 0.6880646, 8.135345), 1)
所以计算部分(a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1])
正确,通过.
在第二次迭代中,a
是 (a[0][0] + b[0][0], a[0][1] + b[0] 的输出][1], a[0][2] + b[0][2], a[1] + b[1])
即 (-11.910430999999999, 1.3582764, 16.271881, 2)
因此,如果您查看数据的格式,a
中没有这样的 a[0][0]
.你可以得到 a[0]
, a[1]
.. 等等.所以这就是问题所在.这也是错误消息所暗示的.
TypeError: 'float' 对象不可下标
对此的解决方案是格式化数据,以便您可以将 a
作为 a[0][0]
访问,如果您格式化就可以完成以下格式的 reduceByKey
.
.reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2]), a[1] + b[1]))
但是那会给你最后一个 mapValues
函数带来麻烦
.mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3]))
作为您的价值观,即a
在 lambda 函数中,是 ((-23.848236199999995, 2.68798829999999998, 32.604461), 4)
所以 a[0]
表示(-23.848236199999995, 2.6879882999999998, 32.604461)
和 a[1]
表示 4
没有更多了,所以你会遇到
IndexError:元组索引超出范围
所以你最后的 mapValues
应该是
.mapValues(lambda a : (a[0][0]/a[1], a[0][1]/a[1],a[0][2]/a[1]]))
总的来说,以下代码应该适合你
rdd_ori = sc.textFile("asdasd.csv") .map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5]))))meanRDD = rdd_ori.mapValues(lambda x: (x, 1)) .reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] +b[0][2]), a[1] + b[1])).mapValues(lambda a : (a[0][0]/a[1], a[0][1]/a[1],a[0][2]/a[1]))
我希望我已经解释得够清楚了.
my "asdasd.csv" file has the following structure.
Index,Arrival_Time,Creation_Time,x,y,z,User,Model,Device,gt
0,1424696633908,1424696631913248572,-5.958191,0.6880646,8.135345,a,nexus4,nexus4_1,stand
1,1424696633909,1424696631918283972,-5.95224,0.6702118,8.136536,a,nexus4,nexus4_1,stand
2,1424696633918,1424696631923288855,-5.9950867,0.6535491999999999,8.204376,a,nexus4,nexus4_1,stand
3,1424696633919,1424696631928385290,-5.9427185,0.6761626999999999,8.128204,a,nexus4,nexus4_1,stand
Ok, I get the following {key,value} tuple to operate with it.
# x y z
[(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345)))]
# part A (key) part B (value)
My code for calculating the mean is the following, I have to calculate the mean from each column, X, Y Z for each Key.
rdd_ori = sc.textFile("asdasd.csv")
.map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5]))))
meanRDD = rdd_ori.mapValues(lambda x: (x,1))
.reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]))
.mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3]))
My problem I that I tried that code and it works fine on other PC with the same MV I'm using for developing it (PySpark Py3)
Here is an example, that this code is correct:
But I don't know why I'm getting this error, important part is in Strong.
--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) in () 9 #sum_1 = count_.reduceByKey(lambda x, y: (x[0][0]+y[0][0],x0+y0,x[0][2]+y[0][2])) 10 ---> 11 print(meanRDD.take(1))
/opt/spark/current/python/pyspark/rdd.py in take(self, num) 1341
1342 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts)) -> 1343 res = self.context.runJob(self, takeUpToNumLeft, p) 1344 1345 items += res/opt/spark/current/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) 990 # SparkContext#runJob. 991 mappedRDD = rdd.mapPartitions(partitionFunc) --> 992 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) 993 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) 994
/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in call(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 for temp_arg in temp_args:
/opt/spark/current/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString()
/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 317 raise Py4JJavaError( 318 "An error occurred while calling {0}{1}{2}. ". --> 319 format(target_id, ".", name), value) 320 else: 321 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 127.0 failed 1 times, most recent failure: Lost task 0.0 in stage 127.0 (TID 102, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main process() File "/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/opt/spark/current/python/pyspark/rdd.py", line 2423, in pipeline_func return func(split, prev_func(split, iterator)) File "/opt/spark/current/python/pyspark/rdd.py", line 2423, in pipeline_func return func(split, prev_func(split, iterator)) File "/opt/spark/current/python/pyspark/rdd.py", line 346, in func return f(iterator) File "/opt/spark/current/python/pyspark/rdd.py", line 1842, in combineLocally merger.mergeValues(iterator) File "/opt/spark/current/python/lib/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues d[k] = comb(d[k], v) if k in d else creator(v) File "", line 3, in TypeError: 'float' object is not subscriptable
Heres how reduceByKey
works. I am taking your example for illustration i.e. with following data that you pass to reduceByKey
# x y z
[(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345), 1))]
# part A (key) part B (value) counter
Let me go step by step
After performing the following mapValues
function
rdd_ori.mapValues(lambda x: (x,1))
the rdd data will look as
((u'a', u'nexus4', u'stand'), ((-5.9427185, 0.6761626999999999, 8.128204), 1))
((u'a', u'nexus4', u'stand'), ((-5.958191, 0.6880646, 8.135345), 1))
((u'a', u'nexus4', u'stand'), ((-5.95224, 0.6702118, 8.136536), 1))
((u'a', u'nexus4', u'stand'), ((-5.9950867, 0.6535491999999999, 8.204376), 1))
So when reduceByKey
is invoked as
.reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]))
And all the rows with same key are grouped and values are passed to the lambda
function of reducyByKey
.
Since in your case, all the keys are same, the values are passed to a
and b
variables in the following iterations.
In first iteration, a
is ((-5.9427185, 0.6761626999999999, 8.128204), 1)
and b
is ((-5.958191, 0.6880646, 8.135345), 1)
so the calculation part (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1])
is correct and passes.
In second iteration, a
is the output of (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1])
which is (-11.910430999999999, 1.3582764, 16.271881, 2)
So if you look at the format of the data there is no such a[0][0]
in a
. You can just get a[0]
, a[1]
.. and so on. So thats the issue. And thats what the error message is suggesting too.
TypeError: 'float' object is not subscriptable
The solution to this is to format the data so that you can access a
as a[0][0]
which can be done if you format your reduceByKey
of the following format.
.reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2]), a[1] + b[1]))
But that would trouble your last mapValues
function
.mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3]))
as your values, i.e. a
in lambda function, are of ((-23.848236199999995, 2.6879882999999998, 32.604461), 4)
so a[0]
means (-23.848236199999995, 2.6879882999999998, 32.604461)
and a[1]
means 4
and there aren't any more so you will encounter
IndexError: tuple index out of range
So your last mapValues
should be
.mapValues(lambda a : (a[0][0]/a[1], a[0][1]/a[1],a[0][2]/a[1]))
So overall, following code should work for you
rdd_ori = sc.textFile("asdasd.csv")
.map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5]))))
meanRDD = rdd_ori.mapValues(lambda x: (x, 1))
.reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2]), a[1] + b[1]))
.mapValues(lambda a : (a[0][0]/a[1], a[0][1]/a[1],a[0][2]/a[1]))
I hope I have explained it well enough.
这篇关于Pyspark - TypeError:使用reduceByKey计算平均值时,“float"对象不可下标的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!