如何在 apache spark (scala) 中迭代 RDD [英] How do I iterate RDD's in apache spark (scala)

查看:33
本文介绍了如何在 apache spark (scala) 中迭代 RDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用以下命令用一堆包含 2 个字符串 ["filename", "content"] 的数组填充 RDD.

I use the following command to fill an RDD with a bunch of arrays containing 2 strings ["filename", "content"].

现在我想遍历所有这些事件,以对每个文件名和内容进行处理.

Now I want to iterate over every of those occurrences to do something with every filename and content.

val someRDD = sc.wholeTextFiles("hdfs://localhost:8020/user/cloudera/*")

不过,我似乎找不到任何有关如何执行此操作的文档.

I can't seem to find any documentation on how to do this however.

所以我想要的是这个:

foreach occurrence-in-the-rdd{
   //do stuff with the array found on loccation n of the RDD
} 

推荐答案

Spark 中的基本操作是 mapfilter.

The fundamental operations in Spark are map and filter.

val txtRDD = someRDD filter { case(id, content) => id.endsWith(".txt") }

txtRDD 现在将只包含扩展名为.txt"的文件

the txtRDD will now only contain files that have the extension ".txt"

如果你想统计这些文件的字数,你可以说

And if you want to word count those files you can say

//split the documents into words in one long list
val words = txtRDD flatMap { case (id,text) => text.split("\\s+") }
// give each word a count of 1
val wordT = words map (x => (x,1))  
//sum up the counts for each word
val wordCount = wordsT reduceByKey((a, b) => a + b)

当您需要执行一些昂贵的初始化时,您想使用 mapPartitions - 例如,如果您想使用斯坦福 coreNLP 工具等库进行命名实体识别.

You want to use mapPartitions when you have some expensive initialization you need to perform -- for example, if you want to do Named Entity Recognition with a library like the Stanford coreNLP tools.

掌握mapfilterflatMapreduce,你就在掌握的路上火花.

Master map, filter, flatMap, and reduce, and you are well on your way to mastering Spark.

这篇关于如何在 apache spark (scala) 中迭代 RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆