如何在 Spark 中转置 RDD [英] How to transpose an RDD in Spark
问题描述
我有一个这样的 RDD:
I have an RDD like this:
1 2 3
4 5 6
7 8 9
这是一个矩阵.现在我想像这样转置RDD:
It is a matrix. Now I want to transpose the RDD like this:
1 4 7
2 5 8
3 6 9
我该怎么做?
推荐答案
假设你有一个 N×M 矩阵.
Say you have an N×M matrix.
如果 N 和 M 都小到可以在内存中保存 N×M 个项目,那么使用 RDD 就没有多大意义了.但是转置很容易:
If both N and M are so small that you can hold N×M items in memory, it doesn't make much sense to use an RDD. But transposing it is easy:
val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
val transposed = sc.parallelize(rdd.collect.toSeq.transpose)
如果 N 或 M 太大以至于您无法在内存中保存 N 或 M 个条目,那么您就不能拥有这样大小的 RDD 行.在这种情况下,原始矩阵或转置矩阵都无法表示.
If N or M is so large that you cannot hold N or M entries in memory, then you cannot have an RDD line of this size. Either the original or the transposed matrix is impossible to represent in this case.
N 和 M 可能是中等大小:您可以在内存中保存 N 或 M 个条目,但不能保存 N×M 个条目.在这种情况下,您必须炸毁矩阵并将其重新组合在一起:
N and M may be of a medium size: you can hold N or M entries in memory, but you cannot hold N×M entries. In this case you have to blow up the matrix and put it together again:
val rdd = sc.parallelize(Seq(Seq(1, 2, 3), Seq(4, 5, 6), Seq(7, 8, 9)))
// Split the matrix into one number per line.
val byColumnAndRow = rdd.zipWithIndex.flatMap {
case (row, rowIndex) => row.zipWithIndex.map {
case (number, columnIndex) => columnIndex -> (rowIndex, number)
}
}
// Build up the transposed matrix. Group and sort by column index first.
val byColumn = byColumnAndRow.groupByKey.sortByKey().values
// Then sort by row index.
val transposed = byColumn.map {
indexedRow => indexedRow.toSeq.sortBy(_._1).map(_._2)
}
这篇关于如何在 Spark 中转置 RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!