Pyspark-TypeError:使用reduceByKey计算均值时,'float'对象不可下标 [英] Pyspark - TypeError: 'float' object is not subscriptable when calculating mean using reduceByKey
问题描述
我的"asdasd.csv"文件具有以下结构.
my "asdasd.csv" file has the following structure.
Index,Arrival_Time,Creation_Time,x,y,z,User,Model,Device,gt
0,1424696633908,1424696631913248572,-5.958191,0.6880646,8.135345,a,nexus4,nexus4_1,stand
1,1424696633909,1424696631918283972,-5.95224,0.6702118,8.136536,a,nexus4,nexus4_1,stand
2,1424696633918,1424696631923288855,-5.9950867,0.6535491999999999,8.204376,a,nexus4,nexus4_1,stand
3,1424696633919,1424696631928385290,-5.9427185,0.6761626999999999,8.128204,a,nexus4,nexus4_1,stand
好吧,我得到以下{key,value}元组以对其进行操作.
Ok, I get the following {key,value} tuple to operate with it.
# x y z
[(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345)))]
# part A (key) part B (value)
我的计算均值的代码如下,我必须从每个列中计算每个键的均值X,Y Z.
My code for calculating the mean is the following, I have to calculate the mean from each column, X, Y Z for each Key.
rdd_ori = sc.textFile("asdasd.csv") \
.map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5]))))
meanRDD = rdd_ori.mapValues(lambda x: (x,1)) \
.reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]))\
.mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3]))
我的问题是我尝试了该代码,并且在其他用于开发它的MV(PySpark Py3)的PC上也可以正常工作
My problem I that I tried that code and it works fine on other PC with the same MV I'm using for developing it (PySpark Py3)
下面是一个示例,此代码正确:
Here is an example, that this code is correct:
但是我不知道为什么会出现此错误,重要的部分是强大.
But I don't know why I'm getting this error, important part is in Strong.
--------------------------------------------------- ---------------------------- Py4JJavaError Traceback(最近的调用 最后)在() 9#sum_1 = count_.reduceByKey(lambda x,y:(x [0] [0] + y [0] [0],x 0 , x [0] [2] + y [0] [2])) 10 ---> 11次打印(meanRDD.take(1))
--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) in () 9 #sum_1 = count_.reduceByKey(lambda x, y: (x[0][0]+y[0][0],x0+y0,x[0][2]+y[0][2])) 10 ---> 11 print(meanRDD.take(1))
/opt/spark/current/python/pyspark/rdd.py in take(自身,num)1341
1342 p =范围(扫描的部分,分钟(扫描的部分+
numPartsToTry,totalParts))
-> 1343 res = self.context.runJob(self,takeUpToNumLeft,p)1344 1345个项+ = res
/opt/spark/current/python/pyspark/rdd.py in take(self, num) 1341
1342 p = range(partsScanned, min(partsScanned +
numPartsToTry, totalParts))
-> 1343 res = self.context.runJob(self, takeUpToNumLeft, p) 1344 1345 items += res
/opt/spark/current/python/pyspark/context.py在runJob(self,rdd, partitionFunc,分区,allowLocal) 990#SparkContext#runJob. 第991章 -> 992端口= self._jvm.PythonRDD.runJob(self._jsc.sc(),mappedRDD._jrdd,分区) 993返回列表(_load_from_socket(端口,被映射的RDD._jrdd_deserializer)) 994
/opt/spark/current/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal) 990 # SparkContext#runJob. 991 mappedRDD = rdd.mapPartitions(partitionFunc) --> 992 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions) 993 return list(_load_from_socket(port, mappedRDD._jrdd_deserializer)) 994
/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py 在呼叫((* args)自己)1131中,答案= self.gateway_client.send_command(command)1132 return_value = get_return_value( -> 1133答案,self.gateway_client,self.target_id,self.name)1134 1135,用于temp_args中的temp_arg:
/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in call(self, *args) 1131 answer = self.gateway_client.send_command(command) 1132 return_value = get_return_value( -> 1133 answer, self.gateway_client, self.target_id, self.name) 1134 1135 for temp_arg in temp_args:
/opt/spark/current/python/pyspark/sql/utils.py in deco(* a,** kw) 61 def deco(* a,** kw): 62试试: ---> 63返回f(* a,** kw) 64,除了py4j.protocol.Py4JJavaError如e: 65 s = e.java_exception.toString()
/opt/spark/current/python/pyspark/sql/utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString()
/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py在 get_return_value(答案,gateway_client,target_id,名称) 第317章; 318调用{0} {1} {2}时发生错误.\ n". -> 319格式(target_id,.",名称),值) 第320章 321提高Py4JError(
/opt/spark/current/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 317 raise Py4JJavaError( 318 "An error occurred while calling {0}{1}{2}.\n". --> 319 format(target_id, ".", name), value) 320 else: 321 raise Py4JError(
Py4JJavaError:调用时发生错误 z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException:由于阶段失败,作业中止了: 阶段127.0中的任务0失败1次,最近一次失败:丢失的任务 在阶段127.0中为0.0(TID 102,本地主机,执行程序驱动程序):org.apache.spark.api.python.Python.PythonException:追溯(最新 最后调用):文件 "/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py",行 177,主要 process()文件"/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py",行 172,进行中 serializer.dump_stream(func(split_index,iterator),outfile)文件"/opt/spark/current/python/pyspark/rdd.py",第2423行,在 pipeline_func 返回func(split,prev_func(split,iterator))文件"/opt/spark/current/python/pyspark/rdd.py",第2423行,在 pipeline_func 在func中返回函数func(split,prev_func(split,iterator))文件"/opt/spark/current/python/pyspark/rdd.py",第346行 返回f(iterator)文件"/opt/spark/current/python/pyspark/rdd.py",行1842,在 在本地合并 merge.mergeValues(iterator)文件"/opt/spark/current/python/lib/pyspark.zip/pyspark/shuffle.py",行 238,在mergeValues中 d [k] = comb(d [k],v),如果k在d造物主(v)File"中,第3行,在TypeError中: 浮动"对象不可下标
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 127.0 failed 1 times, most recent failure: Lost task 0.0 in stage 127.0 (TID 102, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main process() File "/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/opt/spark/current/python/pyspark/rdd.py", line 2423, in pipeline_func return func(split, prev_func(split, iterator)) File "/opt/spark/current/python/pyspark/rdd.py", line 2423, in pipeline_func return func(split, prev_func(split, iterator)) File "/opt/spark/current/python/pyspark/rdd.py", line 346, in func return f(iterator) File "/opt/spark/current/python/pyspark/rdd.py", line 1842, in combineLocally merger.mergeValues(iterator) File "/opt/spark/current/python/lib/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues d[k] = comb(d[k], v) if k in d else creator(v) File "", line 3, in TypeError: 'float' object is not subscriptable
推荐答案
此处说明reduceByKey
的工作方式.我以您的示例为例,即将您传递给reduceByKey
Heres how reduceByKey
works. I am taking your example for illustration i.e. with following data that you pass to reduceByKey
# x y z
[(('a', 'nexus4', 'stand'), ((-5.958191, 0.6880646, 8.135345), 1))]
# part A (key) part B (value) counter
让我一步一步走
执行以下mapValues
功能
rdd_ori.mapValues(lambda x: (x,1))
rdd数据将显示为
((u'a', u'nexus4', u'stand'), ((-5.9427185, 0.6761626999999999, 8.128204), 1))
((u'a', u'nexus4', u'stand'), ((-5.958191, 0.6880646, 8.135345), 1))
((u'a', u'nexus4', u'stand'), ((-5.95224, 0.6702118, 8.136536), 1))
((u'a', u'nexus4', u'stand'), ((-5.9950867, 0.6535491999999999, 8.204376), 1))
因此,当reduceByKey
作为
.reduceByKey(lambda a, b: (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1]))
并且所有具有相同键的行都被分组,并将值传递给reducyByKey
的lambda
函数.
由于您的情况,所有键都是相同的,因此在以下迭代中将值传递给a
和b
变量.
Since in your case, all the keys are same, the values are passed to a
and b
variables in the following iterations.
在第一次迭代中,a
是((-5.9427185, 0.6761626999999999, 8.128204), 1)
,b
是((-5.958191, 0.6880646, 8.135345), 1)
,因此计算部分(a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1])
是正确的并且可以通过.
In first iteration, a
is ((-5.9427185, 0.6761626999999999, 8.128204), 1)
and b
is ((-5.958191, 0.6880646, 8.135345), 1)
so the calculation part (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1])
is correct and passes.
在第二次迭代中,a
是(a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1])
的输出,它是(-11.910430999999999, 1.3582764, 16.271881, 2)
In second iteration, a
is the output of (a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2], a[1] + b[1])
which is (-11.910430999999999, 1.3582764, 16.271881, 2)
因此,如果您查看数据格式,则a
中没有这样的a[0][0]
.您可以只获取a[0]
,a[1]
..等.这就是问题所在.那也是错误消息所暗示的意思.
So if you look at the format of the data there is no such a[0][0]
in a
. You can just get a[0]
, a[1]
.. and so on. So thats the issue. And thats what the error message is suggesting too.
TypeError:浮动"对象不可下标
TypeError: 'float' object is not subscriptable
解决方案是格式化数据,以便您可以将a
作为a[0][0]
进行访问,如果您格式化以下格式的reduceByKey
,则可以完成此操作.
The solution to this is to format the data so that you can access a
as a[0][0]
which can be done if you format your reduceByKey
of the following format.
.reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2]), a[1] + b[1]))
但这会困扰您的最后一个mapValues
函数
But that would trouble your last mapValues
function
.mapValues(lambda a : (a[0]/a[3], a[1]/a[3],a[2]/a[3]))
作为您的价值观,即Lambda函数中的a
是((-23.848236199999995, 2.6879882999999998, 32.604461), 4)
,因此a[0]
表示(-23.848236199999995, 2.6879882999999998, 32.604461)
,而a[1]
表示4
,并且没有更多了,因此您会遇到
as your values, i.e. a
in lambda function, are of ((-23.848236199999995, 2.6879882999999998, 32.604461), 4)
so a[0]
means (-23.848236199999995, 2.6879882999999998, 32.604461)
and a[1]
means 4
and there aren't any more so you will encounter
IndexError:元组索引超出范围
IndexError: tuple index out of range
所以您的最后一个mapValues
应该是
So your last mapValues
should be
.mapValues(lambda a : (a[0][0]/a[1], a[0][1]/a[1],a[0][2]/a[1]))
因此,总的来说,以下代码应该对您有用
rdd_ori = sc.textFile("asdasd.csv") \
.map(lambda x: ((x.split(",")[6], x.split(",")[7], x.split(",")[9]),(float(x.split(",")[3]),float(x.split(",")[4]),float(x.split(",")[5]))))
meanRDD = rdd_ori.mapValues(lambda x: (x, 1)) \
.reduceByKey(lambda a, b: ((a[0][0] + b[0][0], a[0][1] + b[0][1], a[0][2] + b[0][2]), a[1] + b[1]))\
.mapValues(lambda a : (a[0][0]/a[1], a[0][1]/a[1],a[0][2]/a[1]))
我希望我已经解释清楚了.
I hope I have explained it well enough.
这篇关于Pyspark-TypeError:使用reduceByKey计算均值时,'float'对象不可下标的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!