并行处理Spark列 [英] Spark processing columns in parallel

查看:76
本文介绍了并行处理Spark列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在玩Spark,但我设法使它处理我的数据.我的数据由平面分隔的文本文件组成,该文件由50列和大约2000万行组成.我有将处理每一列的scala脚本.

I've been playing with Spark, and I managed to get it to crunch my data. My data consists of flat delimited text file, consisting of 50 columns and about 20 millions of rows. I have scala scripts that will process each column.

就并行处理而言,我知道RDD操作在多个节点上运行.因此,每次我处理一列时,它们都是并行处理的,但是列本身是按顺序处理的.

In terms of parallel processing, I know that RDD operation run on multiple nodes. So, every time I process a column, they are processed in parallel, but the column itself is processed sequentially.

一个简单的例子:如果我的数据是5列文本分隔文件,并且每一列都包含文本,并且我想对每一列进行字数统计.我会的:

A simple example: if my data is 5 column text delimited file and each column contain text, and I want to do word count for each column. I would do:

for(i <- 0 until 4){
   data.map(_.split("\t",-1)(i)).map((_,1)).reduce(_+_)
}

尽管每列的操作都是并行运行的,但列本身是按顺序处理的(我知道用词不好.很抱歉!).换句话说,在完成列1之后将处理列2.在完成第1列和第2列之后处理第3列,依此类推.

Although each column's operation is run in parallel, the column itself is processed sequentially(bad wording I know. Sorry!). In other words, column 2 is processed after column 1 is done. Column 3 is processed after column 1 and 2 are done, and so on.

我的问题是:是否有一次要同时处理多列?如果您知道一种方法,请参考教程,可以介意与我分享吗?

My question is: Is there anyway to process multiple column at a time? If you know a way, cor a tutorial, would you mind sharing it with me?

谢谢!!

推荐答案

假定输入为seq.可以执行以下操作来同时处理列.基本思想是使用序列(列,输入)作为键.

Suppose the inputs are seq. Following can be done to process columns concurrently. The basic idea is to using sequence (column, input) as the key.

scala> val rdd = sc.parallelize((1 to 4).map(x=>Seq("x_0", "x_1", "x_2", "x_3")))
rdd: org.apache.spark.rdd.RDD[Seq[String]] = ParallelCollectionRDD[26] at parallelize at <console>:12

scala> val rdd1 = rdd.flatMap{x=>{(0 to x.size - 1).map(idx=>(idx, x(idx)))}}
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = FlatMappedRDD[27] at flatMap at <console>:14

scala> val rdd2 = rdd1.map(x=>(x, 1))
rdd2: org.apache.spark.rdd.RDD[((Int, String), Int)] = MappedRDD[28] at map at <console>:16

scala> val rdd3 = rdd2.reduceByKey(_+_)
rdd3: org.apache.spark.rdd.RDD[((Int, String), Int)] = ShuffledRDD[29] at reduceByKey at <console>:18

scala> rdd3.take(4)
res22: Array[((Int, String), Int)] = Array(((0,x_0),4), ((3,x_3),4), ((2,x_2),4), ((1,x_1),4))

示例输出:(((0,x_0),4)表示第一列,键是x_0,值是4.您可以从此处开始进行进一步处理.

The example output: ((0, x_0), 4) means the first column, key is x_0, and value is 4. You can start from here to process further.

这篇关于并行处理Spark列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆