如何在Spark Scala中使用mapPartitions? [英] How to use mapPartitions in Spark Scala?

查看:456
本文介绍了如何在Spark Scala中使用mapPartitions?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有DocsRDD:RDD [String,String]

I have DocsRDD : RDD[String, String]

val DocsRDD = sc.wholeTextFiles("myDirectory/*" , 2)

DocsRDD:

Doc1.txt , bla bla bla .....\n bla bla bla \n bla ... bla
Doc2.txt , bla bla bla .....bla \n bla bla \n bla ... bla
Doc3.txt , bla bla bla .....\n bla bla bla \n bla ... bla
Doc4.txt , bla bla \n  .....\n bla bla bla bla \n ... bla

是否有一种有效,优雅的方法通过mapPartitions从中提取n-gram? 到目前为止,我已经尝试了所有方法,已经阅读了至少5次以上关于mapPartitions的内容,但是我仍然不知道如何使用它!似乎有点难以操作. 简而言之,我要:

Is there an efficient, elegant way to extract n-grams from these with mapPartitions? So far i have tried everything, i have read everything i could find at least 5 times over and over about mapPartitions but i still cannot understand how to use it! It seems waaay too difficult to manipulate. In short i want :

val NGramsRDD = DocsRDD.map(x => (x._1 , x._2.sliding(n) ) )

,但可以有效地使用mapPartitions. 我对mapPartitions的基本误解是:

but efficiently with mapPartitions. My basic misunderstanding of mapPartitions is :

OneDocRDD:RDD [String]

OneDocRDD : RDD[String]

 val OneDocRDD = sc.textFile("myDoc1.txt" , 2)
                   .mapPartitions(s1 : Iterator[String] => s2 : Iterator[String])

我不明白这一点!从s1是Iterator [String]开始? s1是sc.textfile之后的String.

I Cannot understand this! From when s1 was Iterator[String]? s1 is String after sc.textfile.

好吧,我的第二个问题是:在这种情况下,mapPartitions是否会改善我克服地图的能力?

Alright my second question is : Will mapPartitions improve my overcome against map in this situation?

最后但并非最不重要: f()可以是:

Last but not Least important: can f() be :

     f(Iterator[String]) : Iterator[Something else?]

推荐答案

我不确定.mapPartitions是否会有所帮助(至少没有给出示例),但是使用.mapPartitions看起来像:

I'm not sure that .mapPartitions will help (at least, not given the example), but using .mapPartitions would look like:

val OneDocRDD = sc.textFile("myDoc1.txt", 2)
  .mapPartitions(iter => {
    // here you can initialize objects that you would need 
    // that you want to create once by worker and not for each x in the map. 
    iter.map(x => (x._1 , x._2.sliding(n)))
  })

通常,您想使用.mapPartitions创建/初始化您不需要的对象(例如:太大)或无法序列化到工作程序节点.如果没有.mapPartitions,则需要在.map中创建它们,但是这样做效率不高,因为将为每个x创建对象.

Normally you want to use .mapPartitions to create/initialize an object you don't want (example: too big) or can't serialize to the worker nodes. Without .mapPartitions you would need to create them in the .map, but that would not be efficient since the object would be created for each x.

这篇关于如何在Spark Scala中使用mapPartitions?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆