Spark Dataframe 是否具有与 Panda 的合并指示器等效的选项? [英] Does Spark Dataframe have an equivalent option of Panda's merge indicator?
本文介绍了Spark Dataframe 是否具有与 Panda 的合并指示器等效的选项?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
python Pandas 库包含以下函数:
DataFrame.merge(right, how='inner', on=None, left_on=None, right_on=None, left_index=False,right_index=False, sort=False, suffixes=('_x', '_y'), copy=True,指标=假)
指标字段结合 Panda 的 value_counts() 函数可用于快速确定联接的执行情况.
示例:
In [48]: df1 = pd.DataFrame({'col1': [0, 1], 'col_left':['a', 'b']})在 [49]: df2 = pd.DataFrame({'col1': [1, 2, 2],'col_right':[2, 2, 2]})在 [50]: pd.merge(df1, df2, on='col1', how='outer', indicator=True)出[50]:col1 col_left col_right _merge0 0 一个 NaN left_only1 1 b 2.0 两者2 2 NaN 2.0 right_only3 2 NaN 2.0 right_only
在 Spark 数据帧中检查连接性能的最佳方法是什么?
在其中 1 个答案中提供了一个自定义函数:它还没有给出正确的结果,但如果可以的话那就太好了:
ASchema = StructType([StructField('id', IntegerType(),nullable=False),StructField('name', StringType(),nullable=False)])BSchema = StructType([StructField('id', IntegerType(),nullable=False),StructField('role', StringType(),nullable=False)])AData = sc.parallelize ([ Row(1,'michel'), Row(2,'diederik'), Row(3,'rok'), Row(4,'piet')])BData = sc.parallelize ([ Row(1,'engineer'), Row(2,'lead'), Row(3,'scientist'), Row(5,'manager')])ADF = hc.createDataFrame(AData,ASchema)BDF = hc.createDataFrame(BData,BSchema)DFJOIN = ADF.join(BDF, ADF['id'] == BDF['id'], "outer")DFJOIN.show()输入:+----+--------+----+---------+|身份证|姓名|身份证|角色|+----+--------+----+---------+|1|米歇尔|1|工程师||2|迪德里克|2|铅||3|韩国|3|科学家||4|皮特|空|空||空|空|5|经理|+----+--------+----+---------+从 pyspark.sql.functions 导入 *DFJOINMERGE = DFJOIN.withColumn("_merge", when(ADF["id"].isNull(), "right_only").when(BDF["id"].isNull(), "left_only").otherwise("both"))\.withColumn("id", 合并(ADF["id"], BDF["id"]))\.drop(ADF["id"])\.drop(BDF["id"])DFJOINMERGE.show()输出+---+--------+---+---------+------+|身份证|姓名|身份证|角色|_merge|+---+--------+---+---------+------+|1|米歇尔|1|工程师|两者||2|迪德里克|2|铅|两者||3|韩国|3|科学家|两者||4|皮特|4|空|两者||5|空|5|经理|两者|+---+--------+---+---------+------+==>我希望 id 4 是左边的,而 id 5 是右边的.将连接更改为左":输入:+---+--------+----+---------+|身份证|姓名|身份证|角色|+---+--------+----+---------+|1|米歇尔|1|工程师||2|迪德里克|2|铅||3|韩国|3|科学家||4|皮特|空|空|+---+--------+----+---------+输出+---+--------+---+---------+------+|身份证|姓名|身份证|角色|_merge|+---+--------+---+---------+------+|1|米歇尔|1|工程师|两者||2|迪德里克|2|铅|两者||3|韩国|3|科学家|两者||4|皮特|4|空|两者|+---+--------+---+---------+------+
解决方案
试试这个:
<预><代码>>>>从 pyspark.sql.functions 导入 *>>>sdf1 = sqlContext.createDataFrame(df1)>>>sdf2 = sqlContext.createDataFrame(df2)>>>sdf = sdf1.join(sdf2, sdf1["col1"] == sdf2["col1"], "outer")>>>sdf.withColumn("_merge", when(sdf1["col1"].isNull(), "right_only").when(sdf2["col1"].isNull(), "left_only").otherwise("both"))\... .withColumn("col1", 合并(sdf1["col1"], sdf2["col1"]))\... .drop(sdf1["col1"])\... .drop(sdf2["col1"])The python Pandas library contains the following function :
DataFrame.merge(right, how='inner', on=None, left_on=None, right_on=None, left_index=False,
right_index=False, sort=False, suffixes=('_x', '_y'), copy=True,
indicator=False)
The indicator field combined with Panda's value_counts() function can be used to quickly determine how well a join performed.
Example:
In [48]: df1 = pd.DataFrame({'col1': [0, 1], 'col_left':['a', 'b']})
In [49]: df2 = pd.DataFrame({'col1': [1, 2, 2],'col_right':[2, 2, 2]})
In [50]: pd.merge(df1, df2, on='col1', how='outer', indicator=True)
Out[50]:
col1 col_left col_right _merge
0 0 a NaN left_only
1 1 b 2.0 both
2 2 NaN 2.0 right_only
3 2 NaN 2.0 right_only
What is the best way to check the performance of a join within a Spark Dataframe?
A custom function was provided in 1 of the answers: It does not yet give the correct results but it would be great if it would:
ASchema = StructType([StructField('id', IntegerType(),nullable=False),
StructField('name', StringType(),nullable=False)])
BSchema = StructType([StructField('id', IntegerType(),nullable=False),
StructField('role', StringType(),nullable=False)])
AData = sc.parallelize ([ Row(1,'michel'), Row(2,'diederik'), Row(3,'rok'), Row(4,'piet')])
BData = sc.parallelize ([ Row(1,'engineer'), Row(2,'lead'), Row(3,'scientist'), Row(5,'manager')])
ADF = hc.createDataFrame(AData,ASchema)
BDF = hc.createDataFrame(BData,BSchema)
DFJOIN = ADF.join(BDF, ADF['id'] == BDF['id'], "outer")
DFJOIN.show()
Input:
+----+--------+----+---------+
| id| name| id| role|
+----+--------+----+---------+
| 1| michel| 1| engineer|
| 2|diederik| 2| lead|
| 3| rok| 3|scientist|
| 4| piet|null| null|
|null| null| 5| manager|
+----+--------+----+---------+
from pyspark.sql.functions import *
DFJOINMERGE = DFJOIN.withColumn("_merge", when(ADF["id"].isNull(), "right_only").when(BDF["id"].isNull(), "left_only").otherwise("both"))\
.withColumn("id", coalesce(ADF["id"], BDF["id"]))\
.drop(ADF["id"])\
.drop(BDF["id"])
DFJOINMERGE.show()
Output
+---+--------+---+---------+------+
| id| name| id| role|_merge|
+---+--------+---+---------+------+
| 1| michel| 1| engineer| both|
| 2|diederik| 2| lead| both|
| 3| rok| 3|scientist| both|
| 4| piet| 4| null| both|
| 5| null| 5| manager| both|
+---+--------+---+---------+------+
==> I would expect id 4 to be left, and id 5 to be right.
Changing join to "left":
Input:
+---+--------+----+---------+
| id| name| id| role|
+---+--------+----+---------+
| 1| michel| 1| engineer|
| 2|diederik| 2| lead|
| 3| rok| 3|scientist|
| 4| piet|null| null|
+---+--------+----+---------+
Output
+---+--------+---+---------+------+
| id| name| id| role|_merge|
+---+--------+---+---------+------+
| 1| michel| 1| engineer| both|
| 2|diederik| 2| lead| both|
| 3| rok| 3|scientist| both|
| 4| piet| 4| null| both|
+---+--------+---+---------+------+
解决方案
Try this:
>>> from pyspark.sql.functions import *
>>> sdf1 = sqlContext.createDataFrame(df1)
>>> sdf2 = sqlContext.createDataFrame(df2)
>>> sdf = sdf1.join(sdf2, sdf1["col1"] == sdf2["col1"], "outer")
>>> sdf.withColumn("_merge", when(sdf1["col1"].isNull(), "right_only").when(sdf2["col1"].isNull(), "left_only").otherwise("both"))\
... .withColumn("col1", coalesce(sdf1["col1"], sdf2["col1"]))\
... .drop(sdf1["col1"])\
... .drop(sdf2["col1"])
这篇关于Spark Dataframe 是否具有与 Panda 的合并指示器等效的选项?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文