sparklyr-在Apache Spark联接中包含空值 [英] sparklyr - Including null values in an Apache Spark Join

查看:170
本文介绍了sparklyr-在Apache Spark联接中包含空值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

问题在Apache Spark Join中包含空值有针对Scala,PySpark和SparkR的答案,但没有针对Sparklyr的答案.我一直无法弄清楚如何在sparklyr中将inner_join视为相等的联接列中的空值.有谁知道如何在sparklyr中完成此操作?

The question Including null values in an Apache Spark Join has answers for Scala, PySpark and SparkR, but not for sparklyr. I've been unable to figure out how to have inner_join in sparklyr treat null values in a join column as equal. Does anyone know how this can be done in sparklyr?

推荐答案

您可以调用隐式交叉联接:

You can invoke an implicit cross join:

#' Return a Cartesian product of Spark tables
#'
#' @param df1 tbl_spark
#' @param df2 tbl_spark
#' @param explicit logical If TRUE use crossJoin otherwise 
#'   join without expression
#' @param suffix character suffixes to be used on duplicate names
cross_join <- function(df1, df2, 
    explicit = FALSE, suffix = c("_x", "_y")) {

  common_cols <- intersect(colnames(df1), colnames(df2))

  if(length(common_cols) > 0) {
    df1 <- df1 %>% rename_at(common_cols, funs(paste0(., suffix[1])))
    df2 <- df2 %>% rename_at(common_cols, funs(paste0(., suffix[2])))
  }

  sparklyr::invoke(
    spark_dataframe(df1), 
    if(explicit) "crossJoin" else "join", 
    spark_dataframe(df2)) %>% sdf_register()
}

并使用IS NOT DISTINCT FROM

# Enable Cross joins
sc %>% 
  spark_session() %>% 
  sparklyr::invoke("conf") %>%
  sparklyr::invoke("set", "spark.sql.crossJoin.enabled", "true")

df1 <- copy_to(sc, tibble(id1 = c(NA, "foo", "bar"), val = 1:3))
df2 <- copy_to(sc, tibble(id2 = c(NA, "foo", "baz"), val = 4:6))

df1 %>%
  cross_join(df2) %>% 
  filter(id1 %IS NOT DISTINCT FROM% id2)

# Source: spark<?> [?? x 4]
  id1   val_x id2   val_y
* <chr> <int> <chr> <int>
1 NA        1 NA        4
2 foo       2 foo       5

优化执行计划:

<jobj[62]>
  org.apache.spark.sql.catalyst.plans.logical.Join
  Join Inner, (id1#10 <=> id2#76)
:- Project [id1#10, val#11 AS val_x#129]
:  +- InMemoryRelation [id1#10, val#11], StorageLevel(disk, memory, deserialized, 1 replicas)
:        +- Scan ExistingRDD[id1#10,val#11]
+- Project [id2#76, val#77 AS val_y#132]
   +- InMemoryRelation [id2#76, val#77], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- Scan ExistingRDD[id2#76,val#77]

<=>运算符应以相同的方式工作:

<=> operator should work the same way:

df1 %>%
  cross_join(df2) %>% 
  filter(id1 %<=>% id2)

请注意:

  • 如果隐式交叉连接未通过选择将结果提升为哈希连接/排序合并连接或交叉连接,将失败,已启用.
  • 在这种情况下,不应使用显式交叉联接,因为它将优先于后续选择.
  • 可以使用dplyr样式交叉连接:

  • Implicit cross join will fail, if it is not followed by selection which promotes the result to hash join / sort merge join, or cross join is enabled.
  • Explicit cross join shouldn't be used in this case, as it will take precedence over subsequent selections.
  • It is possible to use dplyr style cross join:

mutate(df1, `_const` = TRUE) %>%  
  inner_join(mutate(df2, `_const` = TRUE), by = c("_const")) %>% 
  select(-`_const`) %>% 
  filter(id1 %IS NOT DISTINCT FROM% id2)

但是我建议不要这样做,因为它不那么健壮(取决于上下文优化器可能无法识别_const是常量).

but I'd advise against that, as it is less robust (depending on the context optimizer might be unable to recognize that _const is constant).

这篇关于sparklyr-在Apache Spark联接中包含空值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆