Spark DataFrames上的Cogroup [英] Cogroup on Spark DataFrames
问题描述
我有2个要根据关联密钥合并的大型DataFrame.使用join
需要更长的时间才能完成任务.
I have 2 large DataFrames to be merged based on a association key. Using join
takes a longer time to complete the task.
我看到在Apache Spark中使用cogroup
比Joins更受青睐.任何人都可以指出如何在DataFrames上使用cogroup
或提出一种更好的方法来合并2个大型DataFrames.
I see that using cogroup
is prefered over Joins in Apache Spark. Can anyone point on how to use cogroup
on DataFrames or suggest a better approach for merging 2 large DataFrames.
谢谢
推荐答案
火花> = 3.0
从3.0开始,Spark使用Pandas/Arrow提供了特定于PySpark的cogroup
.通用语法如下:
Since 3.0 Spark provides PySpark-specific cogroup
using Pandas / Arrow. General syntax is as follows:
left.cogroup(right).apply(f)
其中both
和right
是GroupedData
对象,f
是COGROUPED_MAP
用户定义函数,该函数使用两个熊猫DataFrames
并返回熊猫DataFrame
where both
and right
are GroupedData
objects and f
is a COGROUPED_MAP
User Defined Function that takes two Pandas DataFrames
and returns Pandas DataFrame
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pandas.core.frame import DataFrame as PandasDataFrame
@pandas_udf(schema)
def f(left: PandasDataFrame, right: PandasDataFrame) -> PandasDataFrame: ...
火花> = 1.6
JVM KeyValueGroupedDataset
提供了Java
JVM KeyValueGroupedDataset
provides both Java
def cogroup[U, R](other: KeyValueGroupedDataset[K, U], f: CoGroupFunction[K, V, U, R], encoder: Encoder[R]): Dataset[R]
和Scala
def cogroup[U, R](other: KeyValueGroupedDataset[K, U])(f: (K, Iterator[V], Iterator[U]) ⇒ TraversableOnce[R])(implicit arg0: Encoder[R]): Dataset[R]
但是,它旨在用于强"类型的变体,而不是Dataset[Row]
,并且极不可能有助于您宣布的目标(提高性能).
It is however intended for "strongly" typed variants, not Dataset[Row]
, and is highly unlikely to contribute to your declared goal (performance improvement).
火花< 1.6 (此部分继续有效,但上面列出的一些小的API除外).
Spark < 1.6 (this part stays valid onward, with exception to small API additions listed above).
DataFrame
不提供任何cogroup
函数的等效项,并且复杂对象不是Spark SQL中的第一类公民.复杂结构上可用的一组操作相当有限,因此通常您必须创建不平凡的自定义表达式或使用UDF并付出性能损失.此外,Spark SQL使用的join
逻辑与普通的RDDs
相同.
DataFrame
doesn't provide any equivalent of cogroup
function and complex objects are not the first class citizens in the Spark SQL. A set of operations available on complex structures is rather limited so typically you have to either create custom expression what is not trivial or use UDFs and pay a performance penalty. Moreover Spark SQL doesn't use the same join
logic as plain RDDs
.
关于RDD.尽管存在边界情况,其中cogroup
可能优于join
,但通常情况并非如此,除非结果->完整数据集的笛卡尔积.在使用cogroup
后跟flatMapValues
表示RDD上的所有联接之后,由于后一个操作是本地操作,所以唯一的实际开销就是创建输出元组.
Regarding RDDs. While there exist border cases where cogroup
can be favorable over join
but typically it shouldn't be the case unless the results -> Cartesian product of complete dataset. After all joins on RDDs are expressed using cogroup
followed by flatMapValues
and since the latter operation is local the only real overhead is creation of the output tuples.
如果您的表仅包含基本类型,则可以通过先使用collect_list
聚合列来模拟类似协同操作的行为,但我希望这里不会有任何性能提升.
If your tables contain only primitive types you could mimic co-group like behavior by aggregating columns with collect_list
first but I wouldn't expect any performance gains here.
这篇关于Spark DataFrames上的Cogroup的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!