多线火花推拉窗 [英] Multiline Spark sliding window
问题描述
我正在使用Scala学习Apache Spark,并希望使用它来处理跨越多行的DNA数据集,如下所示:
I am learning Apache Spark with Scala and would like to use it to process a DNA data set that spans multiple lines like this:
ATGTAT
ACATAT
ATATAT
我想将其映射为固定大小 k 的组,并计算组数.因此,对于k = 3,我们将获得每个字符的组以及接下来的两个字符:
I want to map this into groups of a fixed size k and count the groups. So for k=3, we would get groups of each character with the next two characters:
ATG TGT GTA TAT ATA TAC
ACA CAT ATA TAT ATA TAT
ATA TAT ATA TAT
...然后对组进行计数(如字数统计):
...then count the groups (like word count):
(ATA,5), (TAT,5), (TAC,1), (ACA,1), (CAT,1), (ATG,1), (TGT,1), (GTA,1)
问题在于,单词"跨多行,如上例中的 TAC
一样.它跨越了换行.我不想只计算每行中的组,而是要计算整个文件中的行,而忽略行尾.
The problem is that the "words" span multiple lines, as does TAC
in the example above. It spans the line wrap. I don't want to just count the groups in each line, but in the whole file, ignoring line endings.
换句话说,我想将整个序列作为整个文件中宽度为 k 的滑动窗口来处理,就好像没有换行符一样.问题是当我到达一行的末尾时,向前(或向后)浏览下一个RDD行以完成一个窗口.
In other words, I want to process the entire sequence as a sliding window of width k over the entire file as though there were no line breaks. The problem is looking ahead (or back) to the next RDD row to complete a window when I get to the end of a line.
我有两个想法:
- 从下一行追加k-1个字符:
ATATATAC
ACATATAT
ATATAT
我使用Spark SQL lead()函数尝试了此操作,但是当我尝试执行flatMap时,我得到了WindowSpec的NotSerializableException.还有其他方法可以参考下一行吗?我需要编写自定义输入格式吗?
I tried this with the Spark SQL lead() function, but when I tried executing a flatMap, I got a NotSerializableException for WindowSpec. Is there any other way to reference the next line? Would I need to write a custom input format?
- 以单行形式读取整个序列(或在读取后加入行):
ATATATACATATATATAT
是否可以读取多行,以便将它们作为一个行处理?如果是这样,那么所有这些都需要容纳在一台机器的内存中吗?
Is there a way to read multiple lines so they can be processed as one? If so, would it all need to fit into the memory of a single machine?
我意识到这些都可以作为预处理步骤来完成.我想知道最好的方法是在Spark中执行此操作.一旦我将它们以任何一种格式保存,我都知道如何做剩下的事情,但是我被困在这里.
I realize either of these could be done as a pre-processing step. I was wondering the best way is to do it within Spark. Once I have it in either of these formats, I know how to do the rest, but I am stuck here.
推荐答案
您可以制作一个单字符串的rdd而不是将它们连成一行,因为那样会使结果成为无法分发的字符串:>
You can make a rdd of single character string instead of join them as one line, since that will make the result a string which can not be distributed:
val rdd = sc.textFile("gene.txt")
// rdd: org.apache.spark.rdd.RDD[String] = gene.txt MapPartitionsRDD[4] at textFile at <console>:24
因此,只需使用 flatMap
将行拆分为字符列表:
So simply use flatMap
to split the lines into List of characters:
rdd.flatMap(_.split("")).collect
// res4: Array[String] = Array(A, T, G, T, A, T, A, C, A, T, A, T, A, T, A, T, A, T)
从此答案中借来的更完整的解决方案:>
A more complete solution borrowed from this answer:
val rdd = sc.textFile("gene.txt")
// create the sliding 3 grams for each partition and record the edges
val rdd1 = rdd.flatMap(_.split("")).mapPartitionsWithIndex((i, iter) => {
val slideList = iter.toList.sliding(3).toList
Iterator((slideList, (slideList.head, slideList.last)))
})
// collect the edge values, concatenate edges from adjacent partitions and broadcast it
val edgeValues = rdd1.values.collect
val sewedEdges = edgeValues zip edgeValues.tail map { case (x, y) => {
(x._2 ++ y._1).drop(1).dropRight(1).sliding(3).toList
}}
val sewedEdgesMap = sc.broadcast(
(0 until rdd1.partitions.size) zip sewedEdges toMap
)
// sew the edge values back to the result
rdd1.keys.mapPartitionsWithIndex((i, iter) => iter ++ List(sewedEdgesMap.value.getOrElse(i, Nil))).
flatMap(_.map(_ mkString "")).collect
// res54: Array[String] = Array(ATG, TGT, GTA, TAT, ATA, TAC, ACA, CAT, ATA, TAT, ATA, TAT, ATA, TAT, ATA, TAT)
这篇关于多线火花推拉窗的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!