无法指望用reduceByKey字((V1,V2)=> V1 + V2)斯卡拉功能火花 [英] Unable to count words using reduceByKey((v1,v2) => v1 + v2) scala function in spark

查看:358
本文介绍了无法指望用reduceByKey字((V1,V2)=> V1 + V2)斯卡拉功能火花的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我刚开始学习的火花。在独立模式下使用火花试图做字数在Scala中。我观察到的问题是reduceByKey()不分组的话预期。空阵列被打印。我也跟着步骤如下...

创建一个文本文件,其中包括用空格分隔一些话。
在火花贝壳我执行下面的命令。

 斯卡拉>进口org.apache.spark.SparkContext
进口org.apache.spark.SparkContext斯卡拉>进口org.apache.spark.SparkContext._
进口org.apache.spark.SparkContext._斯卡拉>进口org.apache.spark.SparkConf
进口org.apache.spark.SparkConf斯卡拉>进口scala.io.Source
进口scala.io.SourceVAL的conf =新SparkConf()。setAppName(你好)
VAL SC =新SparkContext(CONF)斯卡拉> VAL文本文件= sc.textFile(文件:///goutham/tweet.txt)
15/09/20四点00分32秒INFO storage.MemoryStore:ensureFreeSpace(250576)调用curMem = 277327,MAXMEM = 280248975
15/09/20四点00分32秒INFO storage.MemoryStore:阻止broadcast_48存储在内存中的值(估计大小244.7 KB,免费266.8 MB)
15/09/20四点00分32秒INFO storage.MemoryStore:ensureFreeSpace(25159)调用curMem = 527903,MAXMEM = 280248975
15/09/20四点00分32秒INFO storage.MemoryStore:阻止broadcast_48_piece0存储在内存中的字节(估计大小24.6 KB,免费266.7 MB)
15/09/20四点00分32秒INFO storage.BlockManagerInfo:在内存中添加broadcast_48_piece0在localhost:50471(尺寸:24.6 KB,自由:267.2 MB)
15/09/20四点00分32秒INFO spark.SparkContext:从创建的文本文件播放48 AT<&控制台GT;:29文本文件:org.apache.spark.rdd.RDD [字符串] = MapPartitionsRDD [46]在TEXTFILE AT&LT ;控制台>:29
斯卡拉> VAL WC = textFile.flatMap(行=> line.split())图(字=>(字,1))。缓存()
WC:org.apache.spark.rdd.RDD [(字符串,整数)] = MapPartitionsRDD [48]在地图AT<&控制台GT;:31斯卡拉> wc.collect()
res26:数组[(字符串,整数)] =阵列((一,1),(二,1),(三,1),(一,1),(七,1),(十,1))斯卡拉>无功输出= wc.reduceByKey((V1,V2)= GT; V1 + V2).collect()的foreach(的println)。
15/09/20 4时06分59秒INFO storage.BlockManagerInfo:localhost上移除broadcast_49_piece0:50471内存(尺寸:1955.0 B,免费:267.2 MB)
15/09/20 4时06分59秒INFO spark.ContextCleaner:清洗洗牌20
15/09/20 4时06分59秒INFO storage.BlockManagerInfo:localhost上移除broadcast_50_piece0:在内存50471(尺寸:2.2 KB,自由:267.2 MB)
15/09/20 4时06分59秒INFO storage.BlockManagerInfo:localhost上移除broadcast_51_piece0:50471内存(尺寸:1369.0 B,免费:267.2 MB)

输出:org.apache.spark.rdd.RDD [(字符串,整数)] = ShuffledRDD [50]在reduceByKey在:39

 斯卡拉> output.collect()
15/09/20 4时09分03秒INFO spark.SparkContext:开始的工作:收集AT<&控制台GT;:42
15/09/20 4时09分03秒INFO spark.MapOutputTrackerMaster:输出状态,为洗牌21的大小是143字节
15/09/20 4时09分03秒INFO scheduler.DAGScheduler:GOT工作3​​0(收集AT<&控制台GT;:42)1输出分区(allowLocal = FALSE)
15/09/20 4时09分03秒INFO scheduler.DAGScheduler:最终阶段:ResultStage 54(收集AT<&控制台GT;:42)
15/09/20 4时09分03秒INFO scheduler.DAGScheduler:最后阶段的家长:列表(ShuffleMapStage 53)
15/09/20 4时09分03秒INFO scheduler.DAGScheduler:缺少父母:列表()
15/09/20 4时09分03秒INFO scheduler.DAGScheduler:提交ResultStage 54(ShuffledRDD [50]在reduceByKey AT<&控制台GT;:39),它没有父母失踪
15/09/20 4时09分03秒INFO storage.MemoryStore:ensureFreeSpace(2304)调用curMem = 563738,MAXMEM = 280248975
15/09/20 4时09分03秒INFO storage.MemoryStore:阻止broadcast_54存储在内存中的值(估计大小2.3 KB,免费266.7 MB)
15/09/20 4时09分03秒INFO storage.MemoryStore:ensureFreeSpace(1366)调用curMem = 566042,MAXMEM = 280248975
15/09/20 4时09分03秒INFO storage.MemoryStore:阻止broadcast_54_piece0存储字节的存储器(估计规模1366.0 B,免费266.7 MB)
15/09/20 4时09分03秒INFO storage.BlockManagerInfo:在内存中添加broadcast_54_piece0在localhost:50471(尺寸:1366.0 B,免费:267.2 MB)
15/09/20 4时09分03秒INFO spark.SparkContext:创建从广播播出54在DAGScheduler.scala:874
15/09/20 4时09分03秒INFO scheduler.DAGScheduler:(在reduceByKey ShuffledRDD [50] AT<&控制台GT;:39)从ResultStage 54提交1人失踪任务
15/09/20 4时09分03秒INFO scheduler.TaskSchedulerImpl:添加任务设置54.0 1任务
15/09/20 4时09分03秒INFO scheduler.TaskSetManager:在第一阶段54.0(TID 53,本地主机,PROCESS_LOCAL 1165字节)开始任务0.0
15/09/20 4时09分03秒INFO executor.Executor:在第一阶段54.0运行任务0.0(TID 53)
15/09/20 4时09分03秒INFO storage.ShuffleBlockFetcherIterator:0获得非空蔽日0块
15/09/20 4时09分03秒INFO storage.ShuffleBlockFetcherIterator:远程启动0取0毫秒
15/09/20 4时09分03秒INFO executor.Executor:在第一阶段54.0成品任务0.0(TID 53)。 882个字节的结果发送到驱动器
15/09/20 4时09分03秒INFO scheduler.TaskSetManager:完成任务0.0在本地主机上的3毫秒级54.0(TID 53)(1/1)
15/09/20 4时09分03秒INFO scheduler.TaskSchedulerImpl:删除taskset的54.0,其任务已全部建成后,从池
15/09/20 4时09分03秒INFO scheduler.DAGScheduler:ResultStage 54(收集AT<&控制台GT;:42)0.004织完了
15/09/20 4时09分03秒INFO scheduler.DAGScheduler:工作30完成:收集AT<&控制台GT; 42,花了0.047307小号
res29:数组[(字符串,整数)] =阵列()==>>在这里,我没有得到预期的输出。可能任何一个请让我知道我做了错误呢?

PS ::我尝试下面的步骤了。但仍无法获得字数。

 斯卡拉> VAL WC = textFile.flatMap(行=> line.split())图(字=>(字,1))。缓存()
斯卡拉> VAL输出= wc.reduceByKey((V1,V2)= GT; V1 + V2).collect()
15/09/20 6时59分06秒INFO spark.SparkContext:开始的工作:收集AT<&控制台GT;:25
15/09/20 6时59分06秒INFO scheduler.DAGScheduler:注册RDD 3(图AT<&控制台GT;:23)
15/09/20 6时59分06秒INFO scheduler.DAGScheduler:GOT作业3(收集AT<&控制台GT;:25)1输出分区(allowLocal = FALSE)
15/09/20 6时59分06秒INFO scheduler.DAGScheduler:最终阶段:ResultStage 7(收集AT<&控制台GT;:25)
15/09/20 6时59分06秒INFO scheduler.DAGScheduler:最后阶段的家长:列表(ShuffleMapStage 6)
15/09/20 6时59分06秒INFO scheduler.DAGScheduler:缺少父母:列表(ShuffleMapStage 6)
15/09/20 6时59分06秒INFO scheduler.DAGScheduler:提交ShuffleMapStage 6(MapPartitionsRDD [3]在地图AT<&控制台GT;:23),它没有父母失踪
15/09/20 6时59分06秒INFO storage.MemoryStore:ensureFreeSpace(4112)调用curMem = 286320,MAXMEM = 280248975
15/09/20 6时59分06秒INFO storage.MemoryStore:阻止broadcast_7存储在内存中的值(估计大小4.0 KB,免费267.0 MB)
15/09/20 6时59分06秒INFO storage.MemoryStore:ensureFreeSpace(2315)调用curMem = 290432,MAXMEM = 280248975
15/09/20 6时59分06秒INFO storage.MemoryStore:阻止broadcast_7_piece0存储在内存中的字节(估计大小2.3 KB,免费267.0 MB)
15/09/20 6时59分06秒INFO storage.BlockManagerInfo:在内存中添加broadcast_7_piece0在localhost:46205(尺寸:2.3 KB,自由:267.2 MB)
15/09/20 6时59分06秒INFO spark.SparkContext:在DAG​​Scheduler.scala从广播电视播出创建7:874
15/09/20 6时59分06秒INFO scheduler.DAGScheduler:从ShuffleMapStage 6提交1人失踪任务([3]在地图AT&LT MapPartitionsRDD;控制台>:23)
15/09/20 6时59分06秒INFO scheduler.TaskSchedulerImpl:添加任务设置6.0与1任务
15/09/20 6时59分06秒INFO scheduler.TaskSetManager:在第一阶段6.0(TID 6,本地主机,PROCESS_LOCAL 1385字节)开始任务0.0
15/09/20 6时59分06秒INFO executor.Executor:在第一阶段6.0运行的任务0.0(TID 6)
15/09/20 6时59分06秒INFO storage.BlockManager:找到的块rdd_3_0本地
15/09/20 6时59分06秒INFO executor.Executor:在第一阶段6.0(TID 6)成品任务0.0。 2056字节结果发送到驱动器
15/09/20 6时59分06秒INFO scheduler.TaskSetManager:完成任务0.0在本地主机上的59毫秒级6.0(TID 6)(1/1)
15/09/20 6时59分06秒INFO scheduler.TaskSchedulerImpl:删除taskset的6.0,其任务已全部建成后,从池
15/09/20 6时59分06秒INFO scheduler.DAGScheduler:ShuffleMapStage 6(图AT<&控制台GT; 23)完成了0.055小号
15/09/20 6时59分06秒INFO scheduler.DAGScheduler:寻找新的可运行阶段
15/09/20 6时59分06秒INFO scheduler.DAGScheduler:运行:设置()
15/09/20 6时59分06秒INFO scheduler.DAGScheduler:等待:设置(ResultStage 7)
15/09/20 6时59分06秒INFO scheduler.DAGScheduler:失败:设置()
15/09/20 6时59分06秒INFO scheduler.DAGScheduler:缺少父母ResultStage 7:列表()
15/09/20 6点59分06秒INFO scheduler.DAGScheduler:提交ResultStage 7(ShuffledRDD [7]在reduceByKey在&下;控制台>:25),这是目前可运行
15/09/20 6时59分06秒INFO storage.MemoryStore:ensureFreeSpace(2288)调用curMem = 292747,MAXMEM = 280248975
15/09/20 6时59分06秒INFO storage.MemoryStore:阻止broadcast_8存储在内存中的值(估计大小2.2 KB,免费267.0 MB)
15/09/20 6时59分06秒INFO storage.MemoryStore:ensureFreeSpace(1368)调用curMem = 295035,MAXMEM = 280248975
15/09/20 6时59分06秒INFO storage.MemoryStore:阻止broadcast_8_piece0存储字节的存储器(估计规模1368.0 B,免费267.0 MB)
15/09/20 6时59分06秒INFO storage.BlockManagerInfo:在内存中添加broadcast_8_piece0在localhost:46205(尺寸:1368.0 B,免费:267.2 MB)
15/09/20 6时59分06秒INFO spark.SparkContext:在DAG​​Scheduler.scala从广播电视播出创建8:874
15/09/20 6时59分06秒INFO scheduler.DAGScheduler:(;控制台>对于AT&LT reduceByKey [7] ShuffledRDD:25)从ResultStage 7提交1人失踪任务
15/09/20 6时59分06秒INFO scheduler.TaskSchedulerImpl:添加任务设置7.0 1任务
15/09/20 6时59分06秒INFO scheduler.TaskSetManager:在第一阶段7.0(TID 7,本地主机,PROCESS_LOCAL,1165字节)开始任务0.0
15/09/20 6时59分06秒INFO executor.Executor:在第一阶段7.0运行的任务0.0(TID 7)
15/09/20 6时59分06秒INFO spark.MapOutputTrackerMaster:不要有地图产出洗牌3,获取他们
15/09/20 6时59分06秒INFO spark.MapOutputTrackerMaster:做抓取;跟踪端点= AkkaRpcEndpointRef(演员[阿卡:// sparkDriver /用户/ MapOutputTracker#194665441])
15/09/20 6时59分06秒INFO spark.MapOutputTrackerMasterEndpoint:要求发送地图输出位置的洗牌3为localhost:45959
15/09/20 6时59分06秒INFO spark.MapOutputTrackerMaster:输出状态,为洗牌3大小为82字节
15/09/20 6时59分06秒INFO spark.MapOutputTrackerMaster:得到的输出位置
15/09/20 6时59分06秒INFO storage.ShuffleBlockFetcherIterator:0获得非空蔽日0块
15/09/20 6时59分06秒INFO storage.ShuffleBlockFetcherIterator:远程启动0取0毫秒
15/09/20 6时59分06秒INFO executor.Executor:在第一阶段7.0(TID 7)成品任务0.0。 882个字节的结果发送到驱动器
15/09/20 6时59分06秒INFO scheduler.TaskSetManager:完成任务0.0在本地主机上的19毫秒级7.0(TID 7)(1/1)
15/09/20 6时59分06秒INFO scheduler.TaskSchedulerImpl:删除taskset的7.0,其任务已全部建成后,从池
15/09/20 6时59分06秒INFO scheduler.DAGScheduler:ResultStage 7(收集AT<&控制台GT;:25)完成了0.015小号
15/09/20 6时59分06秒INFO scheduler.DAGScheduler:作业3完成:收集AT<&控制台GT; 25,花了0.173682小号
输出:数组[(字符串,整数)] =阵列()斯卡拉>输出的foreach的println斯卡拉>


解决方案

VAR输出= wc.reduceByKey。((V1,V2)=> V1 + V2).collect()的foreach(的println)本身就说明了你想要的阵列及其错误收集输出再次,因为它是单位。如果你想在本地阵列的形式 reduceByKey 的结果,你应该只收集你的 RDD 。在这种情况下,你的 RDD wc.reduceByKey((V1,V2)=> V1 + V2)。所以,试试这个 VAR输出= wc.reduceByKey((V1,V2)=> V1 + V2).collect()

I just started learning spark. Using spark in the standalone mode and trying to do word count in scala. The issue I have observed is reduceByKey() is not grouping the words as expected. NULL array is printed. The steps I have followed are follows...

create a text file and include some words separated by spaces. In the spark shell I am executing the below commands.

scala> import org.apache.spark.SparkContext
import org.apache.spark.SparkContext

scala> import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext._

scala> import org.apache.spark.SparkConf
import org.apache.spark.SparkConf

scala> import scala.io.Source
import scala.io.Source

val conf = new SparkConf().setAppName("hello")
val sc = new SparkContext(conf)

scala> val textFile = sc.textFile("file:///goutham/tweet.txt")
15/09/20 04:00:32 INFO storage.MemoryStore: ensureFreeSpace(250576) called      with curMem=277327, maxMem=280248975 
15/09/20 04:00:32 INFO storage.MemoryStore: Block broadcast_48 stored as values in memory (estimated size 244.7 KB, free 266.8 MB)
15/09/20 04:00:32 INFO storage.MemoryStore: ensureFreeSpace(25159) called with curMem=527903, maxMem=280248975
15/09/20 04:00:32 INFO storage.MemoryStore: Block broadcast_48_piece0 stored as bytes in memory (estimated size 24.6 KB, free 266.7 MB)
15/09/20 04:00:32 INFO storage.BlockManagerInfo: Added broadcast_48_piece0 in memory on localhost:50471 (size: 24.6 KB, free: 267.2 MB)
15/09/20 04:00:32 INFO spark.SparkContext: Created broadcast 48 from textFile at <console>:29 textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[46] at textFile at <console>:29


scala> val wc = textFile.flatMap(line => line.split(" ")).map( word =>(word,1)).cache()
wc: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[48] at map at   <console>:31

scala> wc.collect()
res26: Array[(String, Int)] = Array((one,1), (two,1), (three,1), (one,1), (seven,1), (ten,1))

scala> var output = wc.reduceByKey((v1,v2) => v1 + v2).collect().foreach(println)
15/09/20 04:06:59 INFO storage.BlockManagerInfo: Removed broadcast_49_piece0 on localhost:50471 in memory (size: 1955.0 B, free: 267.2 MB)
15/09/20 04:06:59 INFO spark.ContextCleaner: Cleaned shuffle 20
15/09/20 04:06:59 INFO storage.BlockManagerInfo: Removed broadcast_50_piece0 on localhost:50471 in memory (size: 2.2 KB, free: 267.2 MB)
15/09/20 04:06:59 INFO storage.BlockManagerInfo: Removed broadcast_51_piece0 on localhost:50471 in memory (size: 1369.0 B, free: 267.2 MB)

output: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[50] at reduceByKey at :39

scala> output.collect()
15/09/20 04:09:03 INFO spark.SparkContext: Starting job: collect at <console>:42
15/09/20 04:09:03 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 21 is 143 bytes
15/09/20 04:09:03 INFO scheduler.DAGScheduler: Got job 30 (collect at <console>:42) with 1 output partitions (allowLocal=false)
15/09/20 04:09:03 INFO scheduler.DAGScheduler: Final stage: ResultStage 54(collect at <console>:42)
15/09/20 04:09:03 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 53)
15/09/20 04:09:03 INFO scheduler.DAGScheduler: Missing parents: List()
15/09/20 04:09:03 INFO scheduler.DAGScheduler: Submitting ResultStage 54 (ShuffledRDD[50] at reduceByKey at <console>:39), which has no missing parents
15/09/20 04:09:03 INFO storage.MemoryStore: ensureFreeSpace(2304) called with curMem=563738, maxMem=280248975
15/09/20 04:09:03 INFO storage.MemoryStore: Block broadcast_54 stored as values in memory (estimated size 2.3 KB, free 266.7 MB)
15/09/20 04:09:03 INFO storage.MemoryStore: ensureFreeSpace(1366) called with curMem=566042, maxMem=280248975
15/09/20 04:09:03 INFO storage.MemoryStore: Block broadcast_54_piece0 stored as bytes in memory (estimated size 1366.0 B, free 266.7 MB)
15/09/20 04:09:03 INFO storage.BlockManagerInfo: Added broadcast_54_piece0 in memory on localhost:50471 (size: 1366.0 B, free: 267.2 MB)
15/09/20 04:09:03 INFO spark.SparkContext: Created broadcast 54 from broadcast at DAGScheduler.scala:874
15/09/20 04:09:03 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 54 (ShuffledRDD[50] at reduceByKey at <console>:39)
15/09/20 04:09:03 INFO scheduler.TaskSchedulerImpl: Adding task set 54.0 with 1 tasks
15/09/20 04:09:03 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 54.0 (TID 53, localhost, PROCESS_LOCAL, 1165 bytes)
15/09/20 04:09:03 INFO executor.Executor: Running task 0.0 in stage 54.0 (TID 53)
15/09/20 04:09:03 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
15/09/20 04:09:03 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
15/09/20 04:09:03 INFO executor.Executor: Finished task 0.0 in stage 54.0 (TID 53). 882 bytes result sent to driver
15/09/20 04:09:03 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 54.0 (TID 53) in 3 ms on localhost (1/1)
15/09/20 04:09:03 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 54.0, whose tasks have all completed, from pool 
15/09/20 04:09:03 INFO scheduler.DAGScheduler: ResultStage 54 (collect at <console>:42) finished in 0.004 s
15/09/20 04:09:03 INFO scheduler.DAGScheduler: Job 30 finished: collect at <console>:42, took 0.047307 s
res29: Array[(String, Int)] = Array()

==>> Here I am not getting the expected output. Could any one please let me know where I did the mistake ?

PS:: I tried the following steps too. But still unable to get word count.

scala> val wc = textFile.flatMap(line => line.split(" ")).map( word => (word,1)).cache()
scala> val output = wc.reduceByKey((v1,v2) => v1 + v2).collect()
15/09/20 06:59:06 INFO spark.SparkContext: Starting job: collect at <console>:25
15/09/20 06:59:06 INFO scheduler.DAGScheduler: Registering RDD 3 (map at <console>:23)
15/09/20 06:59:06 INFO scheduler.DAGScheduler: Got job 3 (collect at <console>:25) with 1 output partitions (allowLocal=false)
15/09/20 06:59:06 INFO scheduler.DAGScheduler: Final stage: ResultStage 7(collect at <console>:25)
15/09/20 06:59:06 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 6)
15/09/20 06:59:06 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 6)
15/09/20 06:59:06 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 6 (MapPartitionsRDD[3] at map at <console>:23), which has no missing parents
15/09/20 06:59:06 INFO storage.MemoryStore: ensureFreeSpace(4112) called with curMem=286320, maxMem=280248975
15/09/20 06:59:06 INFO storage.MemoryStore: Block broadcast_7 stored as values in memory (estimated size 4.0 KB, free 267.0 MB)
15/09/20 06:59:06 INFO storage.MemoryStore: ensureFreeSpace(2315) called with curMem=290432, maxMem=280248975
15/09/20 06:59:06 INFO storage.MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 2.3 KB, free 267.0 MB)
15/09/20 06:59:06 INFO storage.BlockManagerInfo: Added broadcast_7_piece0 in memory on localhost:46205 (size: 2.3 KB, free: 267.2 MB)
15/09/20 06:59:06 INFO spark.SparkContext: Created broadcast 7 from broadcast at DAGScheduler.scala:874
15/09/20 06:59:06 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 6 (MapPartitionsRDD[3] at map at <console>:23)
15/09/20 06:59:06 INFO scheduler.TaskSchedulerImpl: Adding task set 6.0 with 1 tasks
15/09/20 06:59:06 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 6.0 (TID 6, localhost, PROCESS_LOCAL, 1385 bytes)
15/09/20 06:59:06 INFO executor.Executor: Running task 0.0 in stage 6.0 (TID 6)
15/09/20 06:59:06 INFO storage.BlockManager: Found block rdd_3_0 locally
15/09/20 06:59:06 INFO executor.Executor: Finished task 0.0 in stage 6.0 (TID 6). 2056 bytes result sent to driver
15/09/20 06:59:06 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 6.0 (TID 6) in 59 ms on localhost (1/1)
15/09/20 06:59:06 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool 
15/09/20 06:59:06 INFO scheduler.DAGScheduler: ShuffleMapStage 6 (map at <console>:23) finished in 0.055 s
15/09/20 06:59:06 INFO scheduler.DAGScheduler: looking for newly runnable stages
15/09/20 06:59:06 INFO scheduler.DAGScheduler: running: Set()
15/09/20 06:59:06 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 7)
15/09/20 06:59:06 INFO scheduler.DAGScheduler: failed: Set()
15/09/20 06:59:06 INFO scheduler.DAGScheduler: Missing parents for ResultStage 7: List()
15/09/20 06:59:06 INFO scheduler.DAGScheduler: Submitting ResultStage 7 (ShuffledRDD[7] at reduceByKey at <console>:25), which is now runnable
15/09/20 06:59:06 INFO storage.MemoryStore: ensureFreeSpace(2288) called with curMem=292747, maxMem=280248975
15/09/20 06:59:06 INFO storage.MemoryStore: Block broadcast_8 stored as values in memory (estimated size 2.2 KB, free 267.0 MB)
15/09/20 06:59:06 INFO storage.MemoryStore: ensureFreeSpace(1368) called with curMem=295035, maxMem=280248975
15/09/20 06:59:06 INFO storage.MemoryStore: Block broadcast_8_piece0 stored as bytes in memory (estimated size 1368.0 B, free 267.0 MB)
15/09/20 06:59:06 INFO storage.BlockManagerInfo: Added broadcast_8_piece0 in memory on localhost:46205 (size: 1368.0 B, free: 267.2 MB)
15/09/20 06:59:06 INFO spark.SparkContext: Created broadcast 8 from broadcast at DAGScheduler.scala:874
15/09/20 06:59:06 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 7 (ShuffledRDD[7] at reduceByKey at <console>:25)
15/09/20 06:59:06 INFO scheduler.TaskSchedulerImpl: Adding task set 7.0 with 1 tasks
15/09/20 06:59:06 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 7.0 (TID 7, localhost, PROCESS_LOCAL, 1165 bytes)
15/09/20 06:59:06 INFO executor.Executor: Running task 0.0 in stage 7.0 (TID 7)
15/09/20 06:59:06 INFO spark.MapOutputTrackerMaster: Don't have map outputs for shuffle 3, fetching them
15/09/20 06:59:06 INFO spark.MapOutputTrackerMaster: Doing the fetch; tracker endpoint = AkkaRpcEndpointRef(Actor[akka://sparkDriver/user/MapOutputTracker#194665441])
15/09/20 06:59:06 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 3 to localhost:45959
15/09/20 06:59:06 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 3 is 82 bytes
15/09/20 06:59:06 INFO spark.MapOutputTrackerMaster: Got the output locations
15/09/20 06:59:06 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks
15/09/20 06:59:06 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
15/09/20 06:59:06 INFO executor.Executor: Finished task 0.0 in stage 7.0 (TID 7). 882 bytes result sent to driver
15/09/20 06:59:06 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 7.0 (TID 7) in 19 ms on localhost (1/1)
15/09/20 06:59:06 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 7.0, whose tasks have all completed, from pool 
15/09/20 06:59:06 INFO scheduler.DAGScheduler: ResultStage 7 (collect at <console>:25) finished in 0.015 s
15/09/20 06:59:06 INFO scheduler.DAGScheduler: Job 3 finished: collect at <console>:25, took 0.173682 s
output: Array[(String, Int)] = Array()

scala> output foreach println

scala> 

解决方案

The var output = wc.reduceByKey((v1,v2) => v1 + v2).collect().foreach(println) itself shows your desired array and its wrong to collect output again, because it is Unit. If you want the result of reduceByKey in form of a local array, you should only collect your RDD. In this case your RDD is wc.reduceByKey((v1,v2) => v1 + v2). So try this var output = wc.reduceByKey((v1,v2) => v1 + v2).collect()

这篇关于无法指望用reduceByKey字((V1,V2)=&GT; V1 + V2)斯卡拉功能火花的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆