如何将一个 RDD 拆分为两个或多个 RDD? [英] How do I split an RDD into two or more RDDs?

查看:72
本文介绍了如何将一个 RDD 拆分为两个或多个 RDD?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在寻找一种将 RDD 拆分为两个或多个 RDD 的方法.我见过的最接近的是 Scala Spark:将集合拆分为几个 RDD? 仍然是单个 RDD.

I'm looking for a way to split an RDD into two or more RDDs. The closest I've seen is Scala Spark: Split collection into several RDD? which is still a single RDD.

如果您熟悉 SAS,可以参考以下内容:

If you're familiar with SAS, something like this:

data work.split1, work.split2;
    set work.preSplit;

    if (condition1)
        output work.split1
    else if (condition2)
        output work.split2
run;

这导致了两个不同的数据集.必须立即坚持才能得到我想要的结果......

which resulted in two distinct data sets. It would have to be immediately persisted to get the results I intend...

推荐答案

不可能从单个转换中产生多个 RDD*.如果要拆分 RDD,则必须为每个拆分条件应用 filter.例如:

It is not possible to yield multiple RDDs from a single transformation*. If you want to split a RDD you have to apply a filter for each split condition. For example:

def even(x): return x % 2 == 0
def odd(x): return not even(x)
rdd = sc.parallelize(range(20))

rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))

如果你只有一个二进制条件并且计算很昂贵,你可能更喜欢这样的东西:

If you have only a binary condition and computation is expensive you may prefer something like this:

kv_rdd = rdd.map(lambda x: (x, odd(x)))
kv_rdd.cache()

rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys()
rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()

这意味着只有一个谓词计算,但需要额外传递所有数据.

It means only a single predicate computation but requires additional pass over all data.

重要的是要注意,只要输入 RDD 被正确缓存并且没有关于数据分布的额外假设,在重复过滤器和带有嵌套 if-else 的 for 循环之间的时间复杂度方面没有显着差异.

It is important to note that as long as an input RDD is properly cached and there no additional assumptions regarding data distribution there is no significant difference when it comes to time complexity between repeated filter and for-loop with nested if-else.

对于 N 个元素和 M 个条件,您必须执行的操作数显然与 N 乘以 M 成正比.在 for 循环的情况下,它应该更接近 (N + MN)/2 并且重复过滤器正好是 NM 但在归根结底就是 O(NM).您可以查看我与 Jason Lenderman 的讨论**,了解一些利弊.

With N elements and M conditions number of operations you have to perform is clearly proportional to N times M. In case of for-loop it should be closer to (N + MN) / 2 and repeated filter is exactly NM but at the end of the day it is nothing else than O(NM). You can see my discussion** with Jason Lenderman to read about some pros-and-cons.

在非常高的层次上,您应该考虑两件事:

At the very high level you should consider two things:

  1. Spark 转换是惰性的,直到您执行一个操作,您的 RDD 才会具体化

  1. Spark transformations are lazy, until you execute an action your RDD is not materialized

为什么重要?回到我的例子:

Why does it matter? Going back to my example:

 rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))

如果以后我决定只需要rdd_odd,那么就没有理由实现rdd_even.

If later I decide that I need only rdd_odd then there is no reason to materialize rdd_even.

如果您查看 SAS 示例来计算 work.split2,您需要实现输入数据和 work.split1.

If you take a look at your SAS example to compute work.split2 you need to materialize both input data and work.split1.

RDD 提供声明式 API.当您使用 filtermap 时,完全取决于 Spark 引擎如何执行此操作.只要传递给转换的函数没有副作用,它就会为优化整个管道创造多种可能性.

RDDs provide a declarative API. When you use filter or map it is completely up to Spark engine how this operation is performed. As long as the functions passed to transformations are side effects free it creates multiple possibilities to optimize a whole pipeline.

归根结底,这个案例还不足以证明其自身的转型是合理的.

At the end of the day this case is not special enough to justify its own transformation.

这个带有过滤模式的映射实际上是在一个核心 Spark 中使用的.请参阅我对 Sparks RDD.randomSplit 如何实际拆分 RDD相关部分>randomSplit 方法.

This map with filter pattern is actually used in a core Spark. See my answer to How does Sparks RDD.randomSplit actually split the RDD and a relevant part of the randomSplit method.

如果唯一的目标是在输入上实现分割,则可以使用 partitionBy 子句用于 DataFrameWriter 文本输出格式:

If the only goal is to achieve a split on input it is possible to use partitionBy clause for DataFrameWriter which text output format:

def makePairs(row: T): (String, String) = ???

data
  .map(makePairs).toDF("key", "value")
  .write.partitionBy($"key").format("text").save(...)


* Spark 中只有 3 种基本类型的转换:


* There are only 3 basic types of transformations in Spark:

  • RDD[T] =>RDD[T]
  • RDD[T] =>RDD[U]
  • (RDD[T], RDD[U]) =>RDD[W]

其中 T、U、W 可以是原子类型或 产品/元组(K,V).任何其他操作都必须使用上述的某种组合来表达.您可以查看原始 RDD 论文了解更多详情.

where T, U, W can be either atomic types or products / tuples (K, V). Any other operation has to be expressed using some combination of the above. You can check the original RDD paper for more details.

** https://chat.stackoverflow.com/rooms/91928/discussion-between-zero323-and-jason-lenderman

*** 另见 Scala Spark:将集合拆分为多个 RDD?

这篇关于如何将一个 RDD 拆分为两个或多个 RDD?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆