你如何使用Python执行两个RDD表基本联接在星火? [英] How do you perform basic joins of two RDD tables in Spark using Python?

查看:654
本文介绍了你如何使用Python执行两个RDD表基本联接在星火?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

您将如何执行使用python Spark中的基本连接?在R您可以使用MERG()来做到这一点。什么是使用python的火花的语法:

How would you perform basic joins in Spark using python? In R you could use merg() to do this. What is the syntax using python on spark for:


  1. 内加入

  2. 左外连接

  3. 十字加入

使用两个表格(RDD)在每个单个列,有一个共同的密钥。

With two tables (RDD) with a single column in each that has a common key.

RDD(1):(key,U)
RDD(2):(key,V)

我觉得一个内部联接是这样的:

I think an inner join is something like this:

rdd1.join(rdd2).map(case (key, u, v) => (key, ls ++ rs));

是吗?我已经在网上搜索,找不到加入的一个很好的例子。先谢谢了。

Is that right? I have searched the internet and can't find a good example of joins. Thanks in advance.

推荐答案

可以使用PairRDDs要么做或Spark数据帧。由于数据帧从操作催化剂受益优化第二个选项可以值得考虑。

It can be done either using PairRDDs or Spark Data Frames. Since data frame operations benefit from Catalyst Optimizer the second option can worth considering.

假设你的数据看起来如下:

Assuming your data looks as follows:

rdd1 =  sc.parallelize([("foo", 1), ("bar", 2), ("baz", 3)])
rdd2 =  sc.parallelize([("foo", 4), ("bar", 5), ("bar", 6)])

随着PairRDDs:

内连接:

rdd1.join(rdd2)

LEFT OUTER JOIN:

Left outer join:

rdd1.leftOuterJoin(rdd2)

笛卡尔乘积:

rdd1.cartesian(rdd2)

广播加入:

  • see Spark: what's the best strategy for joining a 2-tuple-key RDD with single-key RDD?

最后是协同组有没有直接的SQL等价的,但在某些情况下是有用的:

Finally there is cogroup which has no direct SQL equivalent but can be useful in some situations:

cogrouped = rdd1.cogroup(rdd2)

cogrouped.mapValues(lambda x: (list(x[0]), list(x[1]))).collect()
## [('foo', ([1], [4])), ('bar', ([2], [5, 6])), ('baz', ([3], []))]

火花数据帧

您可以使用SQL DSL或使用执行原始的SQL sqlContext.sql

df1 = sqlContext.createDataFrame(rdd1, ('k', 'v1'))
df2 = sqlContext.createDataFrame(rdd2, ('k', 'v2'))

# Register temporary tables to be able to use sqlContext.sql
df1.registerTempTable('df1')
df2.registerTempTable('df2')

内连接:

# inner is a default value so it could be omitted
df1.join(df2, df1.k == df2.k, joinType='inner') 
sqlContext.sql('SELECT * FROM df1 JOIN df2 ON df1.k = df2.k')

LEFT OUTER JOIN:

Left outer join:

df1.join(df2, df1.k == df2.k, joinType='left_outer')
sqlContext.sql('SELECT * FROM df1 LEFT OUTER JOIN df2 ON df1.k = df2.k')

CROSS JOIN:

Cross join:

df1.join(df2)
sqlContext.sql('SELECT * FROM df1 JOIN df2')

由于1.6(1.5 Scala中)每个这些可以用广播相结合功能:

from pyspark.sql.functions import broadcast

df1.join(broadcast(df2))

执行广播加盟。另请参见为什么我BroadcastHashJoin是星火比ShuffledHashJoin慢

这篇关于你如何使用Python执行两个RDD表基本联接在星火?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆