多线火花推拉窗 [英] Multiline Spark sliding window

查看:114
本文介绍了多线火花推拉窗的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Scala学习Apache Spark,并希望使用它来处理跨越多行的DNA数据集,如下所示:

I am learning Apache Spark with Scala and would like to use it to process a DNA data set that spans multiple lines like this:

ATGTAT
ACATAT
ATATAT

我想将其映射为固定大小 k 的组,并计算组数.因此,对于k = 3,我们将获得每个字符的组以及接下来的两个字符:

I want to map this into groups of a fixed size k and count the groups. So for k=3, we would get groups of each character with the next two characters:

ATG TGT GTA TAT ATA TAC 
ACA CAT ATA TAT ATA TAT 
ATA TAT ATA TAT

...然后对组进行计数(如字数统计):

...then count the groups (like word count):

(ATA,5), (TAT,5), (TAC,1), (ACA,1), (CAT,1), (ATG,1), (TGT,1), (GTA,1)

问题在于,单词"跨多行,如上例中的 TAC 一样.它跨越了换行.我不想只计算每行中的组,而是要计算整个文件中的行,而忽略行尾.

The problem is that the "words" span multiple lines, as does TAC in the example above. It spans the line wrap. I don't want to just count the groups in each line, but in the whole file, ignoring line endings.

换句话说,我想将整个序列作为整个文件中宽度为 k 的滑动窗口来处理,就好像没有换行符一样.问题是当我到达一行的末尾时,向前(或向后)浏览下一个RDD行以完成一个窗口.

In other words, I want to process the entire sequence as a sliding window of width k over the entire file as though there were no line breaks. The problem is looking ahead (or back) to the next RDD row to complete a window when I get to the end of a line.

我有两个想法:

  1. 从下一行追加k-1个字符:

ATATATAC
ACATATAT
ATATAT

我使用Spark SQL lead()函数尝试了此操作,但是当我尝试执行flatMap时,我得到了WindowSpec的NotSerializableException.还有其他方法可以参考下一行吗?我需要编写自定义输入格式吗?

I tried this with the Spark SQL lead() function, but when I tried executing a flatMap, I got a NotSerializableException for WindowSpec. Is there any other way to reference the next line? Would I need to write a custom input format?

  1. 以单行形式读取整个序列(或在读取后加入行):

ATATATACATATATATAT

是否可以读取多行,以便将它们作为一个行处理?如果是这样,那么所有这些都需要容纳在一台机器的内存中吗?

Is there a way to read multiple lines so they can be processed as one? If so, would it all need to fit into the memory of a single machine?

我意识到这些都可以作为预处理步骤来完成.我想知道最好的方法是在Spark中执行此操作.一旦我将它们以任何一种格式保存,我都知道如何做剩下的事情,但是我被困在这里.

I realize either of these could be done as a pre-processing step. I was wondering the best way is to do it within Spark. Once I have it in either of these formats, I know how to do the rest, but I am stuck here.

推荐答案

您可以制作一个单字符串的rdd而不是将它们连成一行,因为那样会使结果成为无法分发的字符串:

You can make a rdd of single character string instead of join them as one line, since that will make the result a string which can not be distributed:

val rdd = sc.textFile("gene.txt")
// rdd: org.apache.spark.rdd.RDD[String] = gene.txt MapPartitionsRDD[4] at textFile at <console>:24

因此,只需使用 flatMap 将行拆分为字符列表:

So simply use flatMap to split the lines into List of characters:

rdd.flatMap(_.split("")).collect
// res4: Array[String] = Array(A, T, G, T, A, T, A, C, A, T, A, T, A, T, A, T, A, T)


此答案中借来的更完整的解决方案:


A more complete solution borrowed from this answer:

val rdd = sc.textFile("gene.txt")

// create the sliding 3 grams for each partition and record the edges
val rdd1 = rdd.flatMap(_.split("")).mapPartitionsWithIndex((i, iter) => {
  val slideList = iter.toList.sliding(3).toList
  Iterator((slideList, (slideList.head, slideList.last)))
})

// collect the edge values, concatenate edges from adjacent partitions and broadcast it
val edgeValues = rdd1.values.collect

val sewedEdges = edgeValues zip edgeValues.tail map { case (x, y) => {
  (x._2 ++ y._1).drop(1).dropRight(1).sliding(3).toList
}}

val sewedEdgesMap = sc.broadcast(
  (0 until rdd1.partitions.size) zip sewedEdges toMap
)

// sew the edge values back to the result
rdd1.keys.mapPartitionsWithIndex((i, iter) => iter ++ List(sewedEdgesMap.value.getOrElse(i, Nil))).
  flatMap(_.map(_ mkString "")).collect

// res54: Array[String] = Array(ATG, TGT, GTA, TAT, ATA, TAC, ACA, CAT, ATA, TAT, ATA, TAT, ATA, TAT, ATA, TAT)

这篇关于多线火花推拉窗的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆