如何在Spark RDD中不使用combiningByKey和aggregateByKey获取指定的输出 [英] How to get the specified output without combineByKey and aggregateByKey in spark RDD

查看:291
本文介绍了如何在Spark RDD中不使用combiningByKey和aggregateByKey获取指定的输出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

以下是我的数据:

val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", bar=C","bar=D", "bar=D")  

现在,我要使用以下类型的输出,但是使用combineByKeyaggregateByKey 不使用:

Now I want below types of output but without using combineByKey and aggregateByKey:

1) Array[(String, Int)] = Array((foo,5), (bar,3))  
2) Array((foo,Set(B, A)),
(bar,Set(C, D)))  

以下是我的尝试:

scala> val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C",
     | "bar=D", "bar=D")  
scala> val sample=keysWithValuesList.map(_.split("=")).map(p=>(p(0),(p(1))))
sample: Array[(String, String)] = Array((foo,A), (foo,A), (foo,A), (foo,A), (foo,B), (bar,C), (bar,D), (bar,D))  

现在,当我在变量名后面键入制表符以查看适用于映射的RDD的方法时,我可以看到以下选项,但其中任何一个都不满足我的要求:

Now when I type the variable name followed by tab to see the applicable methods for the mapped RDD I can see the below options out of which none can satisfy my requirement:

scala> sample.
apply          asInstanceOf   clone          isInstanceOf   length         toString       update         

那么我该如何实现呢?

推荐答案

这是一种标准方法.

注意点:您需要使用RDD.我认为这是瓶颈.

Point to note: you need to be working with an RDD. I think that is the bottleneck.

您在这里:

val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C","bar=D", "bar=D") 

val sample=keysWithValuesList.map(_.split("=")).map(p=>(p(0),(p(1))))

val sample2 = sc.parallelize(sample.map(x => (x._1, 1)))
val sample3 = sample2.reduceByKey(_+_) 
sample3.collect()

val sample4 = sc.parallelize(sample.map(x => (x._1, x._2))).groupByKey()   
sample4.collect()

val sample5 = sample4.map(x => (x._1, x._2.toSet))
sample5.collect()

这篇关于如何在Spark RDD中不使用combiningByKey和aggregateByKey获取指定的输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆