如何在 Spark 中压缩两个(或更多)DataFrame [英] How to zip two (or more) DataFrame in Spark

查看:33
本文介绍了如何在 Spark 中压缩两个(或更多)DataFrame的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个 DataFrame ab.a 就像

I have two DataFrame a and b. a is like

Column 1 | Column 2
abc      |  123
cde      |  23 

b 就像

Column 1 
1      
2      

我想压缩 ab(甚至更多)DataFrames,它变成这样:

I want to zip a and b (or even more) DataFrames which becomes something like:

Column 1 | Column 2 | Column 3
abc      |  123     |   1
cde      |  23      |   2

我该怎么做?

推荐答案

DataFrame API 不支持这样的操作.可以 zip 两个 RDD,但要使其工作,您必须匹配分区数和每个分区的元素数.假设是这样:

Operation like this is not supported by a DataFrame API. It is possible to zip two RDDs but to make it work you have to match both number of partitions and number of elements per partition. Assuming this is the case:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, LongType}

val a: DataFrame = sc.parallelize(Seq(
  ("abc", 123), ("cde", 23))).toDF("column_1", "column_2")
val b: DataFrame = sc.parallelize(Seq(Tuple1(1), Tuple1(2))).toDF("column_3")

// Merge rows
val rows = a.rdd.zip(b.rdd).map{
  case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)}

// Merge schemas
val schema = StructType(a.schema.fields ++ b.schema.fields)

// Create new data frame
val ab: DataFrame = sqlContext.createDataFrame(rows, schema)

如果不满足上述条件,唯一想到的选择是添加索引和连接:

If above conditions are not met the only option that comes to mind is adding an index and join:

def addIndex(df: DataFrame) = sqlContext.createDataFrame(
  // Add index
  df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)},
  // Create schema
  StructType(df.schema.fields :+ StructField("_index", LongType, false))
)

// Add indices
val aWithIndex = addIndex(a)
val bWithIndex = addIndex(b)

// Join and clean
val ab = aWithIndex
  .join(bWithIndex, Seq("_index"))
  .drop("_index")

这篇关于如何在 Spark 中压缩两个(或更多)DataFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆