如何将一个RDD分为两个或多个RDD? [英] How do I split an RDD into two or more RDDs?

查看:943
本文介绍了如何将一个RDD分为两个或多个RDD?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在寻找一种将RDD分为两个或更多RDD的方法.我所看到的最接近的是 Scala Spark:将集合拆分为几个RDD ? 仍然是单个RDD.

I'm looking for a way to split an RDD into two or more RDDs. The closest I've seen is Scala Spark: Split collection into several RDD? which is still a single RDD.

如果您熟悉SAS,则类似以下内容:

If you're familiar with SAS, something like this:

data work.split1, work.split2;
    set work.preSplit;

    if (condition1)
        output work.split1
    else if (condition2)
        output work.split2
run;

产生了两个不同的数据集.必须立即坚持以获得我想要的结果...

which resulted in two distinct data sets. It would have to be immediately persisted to get the results I intend...

推荐答案

不可能从单个转换中产生多个RDD *.如果要拆分RDD,则必须为每个拆分条件应用filter.例如:

It is not possible to yield multiple RDDs from a single transformation*. If you want to split a RDD you have to apply a filter for each split condition. For example:

def even(x): return x % 2 == 0
def odd(x): return not even(x)
rdd = sc.parallelize(range(20))

rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))

如果您只有一个二进制条件并且计算量很大,那么您可能会喜欢这样的东西:

If you have only a binary condition and computation is expensive you may prefer something like this:

kv_rdd = rdd.map(lambda x: (x, odd(x)))
kv_rdd.cache()

rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys()
rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()

这意味着仅单个谓词计算,但需要对所有数据进行额外的传递.

It means only a single predicate computation but requires additional pass over all data.

重要的是要注意,只要正确地缓存了输入RDD,并且没有关于数据分布的其他假设,那么在重复过滤器和带嵌套if-else的for循环之间的时间复杂度就不会有显着差异.

It is important to note that as long as an input RDD is properly cached and there no additional assumptions regarding data distribution there is no significant difference when it comes to time complexity between repeated filter and for-loop with nested if-else.

在N个元素和M个条件的情况下,您必须执行的操作数显然与N乘以M成正比.如果是for循环,则它应更接近(N + MN)/2,并且重复过滤器正好是NM,但在一天结束时,除了O(NM)就是什么.您可以通过 Jason Lenderman 查看我的讨论**,以了解一些利弊.

With N elements and M conditions number of operations you have to perform is clearly proportional to N times M. In case of for-loop it should be closer to (N + MN) / 2 and repeated filter is exactly NM but at the end of the day it is nothing else than O(NM). You can see my discussion** with Jason Lenderman to read about some pros-and-cons.

在非常高的层次上,您应该考虑两件事:

At the very high level you should consider two things:

  1. Spark转换是惰性的,直到执行一个操作才实现RDD

  1. Spark transformations are lazy, until you execute an action your RDD is not materialized

为什么重要?回到我的例子:

Why does it matter? Going back to my example:

rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))

如果以后我决定只需要rdd_odd,则没有理由实现rdd_even.

If later I decide that I need only rdd_odd then there is no reason to materialize rdd_even.

如果查看SAS示例以计算work.split2,则需要实现输入数据和work.split1.

If you take a look at your SAS example to compute work.split2 you need to materialize both input data and work.split1.

RDD提供了声明性API.当您使用filtermap时,如何执行此操作完全取决于Spark引擎.只要传递给转换的函数没有副作用,它就会为优化整个管道提供多种可能性.

RDDs provide a declarative API. When you use filter or map it is completely up to Spark engine how this operation is performed. As long as the functions passed to transformations are side effects free it creates multiple possibilities to optimize a whole pipeline.

最终,这种情况还不足以证明其自身的转型合理.

At the end of the day this case is not special enough to justify its own transformation.

此带有过滤器模式的映射实际上是在核心Spark中使用的.请参阅我对 Sparks RDD.randomSplit如何实际拆分RDD

This map with filter pattern is actually used in a core Spark. See my answer to How does Sparks RDD.randomSplit actually split the RDD and a relevant part of the randomSplit method.

如果唯一的目标是实现输入拆分,则可以对DataFrameWriter使用partitionBy子句,其文本输出格式为:

If the only goal is to achieve a split on input it is possible to use partitionBy clause for DataFrameWriter which text output format:

def makePairs(row: T): (String, String) = ???

data
  .map(makePairs).toDF("key", "value")
  .write.partitionBy($"key").format("text").save(...)


* Spark中只有3种基本类型的转换:


* There are only 3 basic types of transformations in Spark:

  • RDD [T] => RDD [T]
  • RDD [T] => RDD [U]
  • (RDD [T],RDD [U])=> RDD [W]

其中T,U,W可以是原子类型,也可以是产品/元组(K,V).必须使用上述某种组合来表示任何其他操作.您可以查看原始RDD论文以了解更多详细信息.

where T, U, W can be either atomic types or products / tuples (K, V). Any other operation has to be expressed using some combination of the above. You can check the original RDD paper for more details.

** http://chat.stackoverflow.com/rooms/91928/discussion-between-zero323-and-jason-lenderman

***另请参见 Scala Spark:将集合分为几个RDD吗?

这篇关于如何将一个RDD分为两个或多个RDD?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆