Apache Spark的RDD根据特定大小进行拆分 [英] Apache Spark's RDD splitting according to the particular size

查看:568
本文介绍了Apache Spark的RDD根据特定大小进行拆分的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从文本文件中读取字符串,但是我想根据特定的大小来限制每一行.例如;

I am trying to read strings from a text file, but I want to limit each line according to a particular size. For example;

这是我代表的文件.

aaaaa \ nbbb \ nccccc

aaaaa\nbbb\nccccc

当尝试通过sc.textFile读取此文件时,RDD会显示该文件.

When trying to read this file by sc.textFile, RDD would appear this one.

scala> val rdd = sc.textFile("textFile")
scala> rdd.collect
res1: Array[String] = Array(aaaaa, bbb, ccccc)

但是我想限制此RDD的大小.例如,如果限制为3,那么我应该像这样.

But I want to limit the size of this RDD. For example, if the limit is 3, then I should get like this one.

Array[String] = Array(aaa, aab, bbc, ccc, c)

做到这一点的最佳性能方法是什么?

What is the best performance way to do that?

推荐答案

这不是一种特别有效的解决方案(也不可怕),但是您可以执行以下操作:

Not a particularly efficient solution (not terrible either) but you can do something like this:

val pairs = rdd
  .flatMap(x => x)  // Flatten
  .zipWithIndex  // Add indices
  .keyBy(_._2 / 3)  // Key by index / n

// We'll use a range partitioner to minimize the shuffle 
val partitioner = new RangePartitioner(pairs.partitions.size, pairs)

pairs
  .groupByKey(partitioner)  // group
  // Sort, drop index, concat
  .mapValues(_.toSeq.sortBy(_._2).map(_._1).mkString("")) 
  .sortByKey()
  .values

可以通过传递显式填充分区所需的数据来避免混洗,但是这需要花费一些精力进行编码.请参阅我对将RDD分区为长度为n的元组的答案.

It is possible to avoid the shuffle by passing data required to fill the partitions explicitly but it takes some effort to code. See my answer to Partition RDD into tuples of length n.

如果您可以接受分区边界上一些未对齐的记录,那么简单的mapPartitions与分组一起就可以以更低的成本实现目标:

If you can accept some misaligned records on partitions boundaries then simple mapPartitions with grouped should do the trick at much lower cost:

rdd.mapPartitions(_.flatMap(x => x).grouped(3).map(_.mkString("")))

也可以使用滑动式RDD:

It is also possible to use sliding RDD:

rdd.flatMap(x => x).sliding(3, 3).map(_.mkString(""))

这篇关于Apache Spark的RDD根据特定大小进行拆分的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆