Spark DataFrames上的Cogroup [英] Cogroup on Spark DataFrames

查看:119
本文介绍了Spark DataFrames上的Cogroup的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有2个要根据关联密钥合并的大型DataFrame.使用join需要更长的时间才能完成任务.

I have 2 large DataFrames to be merged based on a association key. Using join takes a longer time to complete the task.

我看到在Apache Spark中使用cogroup比Joins更受青睐.任何人都可以指出如何在DataFrames上使用cogroup或提出一种更好的方法来合并2个大型DataFrames.

I see that using cogroup is prefered over Joins in Apache Spark. Can anyone point on how to use cogroup on DataFrames or suggest a better approach for merging 2 large DataFrames.

谢谢

推荐答案

火花> = 3.0

从3.0开始,Spark使用Pandas/Arrow提供了特定于PySpark的cogroup.通用语法如下:

Since 3.0 Spark provides PySpark-specific cogroup using Pandas / Arrow. General syntax is as follows:

left.cogroup(right).apply(f)

其中bothrightGroupedData对象,fCOGROUPED_MAP用户定义函数,该函数使用两个熊猫DataFrames并返回熊猫DataFrame

where both and right are GroupedData objects and f is a COGROUPED_MAP User Defined Function that takes two Pandas DataFrames and returns Pandas DataFrame

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pandas.core.frame import DataFrame as PandasDataFrame

@pandas_udf(schema)
def f(left: PandasDataFrame, right: PandasDataFrame) -> PandasDataFrame: ...

火花> = 1.6

JVM KeyValueGroupedDataset提供了Java

JVM KeyValueGroupedDataset provides both Java

def cogroup[U, R](other: KeyValueGroupedDataset[K, U], f: CoGroupFunction[K, V, U, R], encoder: Encoder[R]): Dataset[R] 

和Scala

def cogroup[U, R](other: KeyValueGroupedDataset[K, U])(f: (K, Iterator[V], Iterator[U]) ⇒ TraversableOnce[R])(implicit arg0: Encoder[R]): Dataset[R] 

但是,它旨在用于强"类型的变体,而不是Dataset[Row],并且极不可能有助于您宣布的目标(提高性能).

It is however intended for "strongly" typed variants, not Dataset[Row], and is highly unlikely to contribute to your declared goal (performance improvement).

火花< 1.6 (此部分继续有效,但上面列出的一些小的API除外).

Spark < 1.6 (this part stays valid onward, with exception to small API additions listed above).

DataFrame不提供任何cogroup函数的等效项,并且复杂对象不是Spark SQL中的第一类公民.复杂结构上可用的一组操作相当有限,因此通常您必须创建不平凡的自定义表达式或使用UDF并付出性能损失.此外,Spark SQL使用的join逻辑与普通的RDDs相同.

DataFrame doesn't provide any equivalent of cogroup function and complex objects are not the first class citizens in the Spark SQL. A set of operations available on complex structures is rather limited so typically you have to either create custom expression what is not trivial or use UDFs and pay a performance penalty. Moreover Spark SQL doesn't use the same join logic as plain RDDs.

关于RDD.尽管存在边界情况,其中cogroup可能优于join,但通常情况并非如此,除非结果->完整数据集的笛卡尔积.在使用cogroup后跟flatMapValues表示RDD上的所有联接之后,由于后一个操作是本地操作,所以唯一的实际开销就是创建输出元组.

Regarding RDDs. While there exist border cases where cogroup can be favorable over join but typically it shouldn't be the case unless the results -> Cartesian product of complete dataset. After all joins on RDDs are expressed using cogroup followed by flatMapValues and since the latter operation is local the only real overhead is creation of the output tuples.

如果您的表仅包含基本类型,则可以通过先使用collect_list聚合列来模拟类似协同操作的行为,但我希望这里不会有任何性能提升.

If your tables contain only primitive types you could mimic co-group like behavior by aggregating columns with collect_list first but I wouldn't expect any performance gains here.

这篇关于Spark DataFrames上的Cogroup的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆