如何在 Spark 中读取嵌套集合 [英] How to read a nested collection in Spark

查看:56
本文介绍了如何在 Spark 中读取嵌套集合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一张镶木地板,其中一列是

I have a parquet table with one of the columns being

, array>

, array<struct<col1,col2,..colN>>

可以使用 LATERAL VIEW 语法在 Hive 中针对此表运行查询.

Can run queries against this table in Hive using LATERAL VIEW syntax.

如何将这张表读入RDD,更重要的是如何在Spark中对这个嵌套集合进行过滤、映射等操作?

How to read this table into an RDD, and more importantly how to filter, map etc this nested collection in Spark?

在 Spark 文档中找不到任何对此的引用.提前感谢您提供任何信息!

Could not find any references to this in Spark documentation. Thanks in advance for any information!

ps.我觉得在桌面上提供一些统计数据可能会有所帮助.主表中的列数 ~600.行数~200m.嵌套集合中的列"数 ~10.嵌套集合中的平均记录数 ~35.

ps. I felt might be helpful to give some stats on the table. Number of columns in main table ~600. Number of rows ~200m. Number of "columns" in nested collection ~10. Avg number of records in nested collection ~35.

推荐答案

嵌套集合没有什么神奇之处.Spark 将以相同的方式处理 RDD[(String, String)]RDD[(String, Seq[String])].

There is no magic in the case of nested collection. Spark will handle the same way a RDD[(String, String)] and a RDD[(String, Seq[String])].

不过,从 Parquet 文件中读取这样的嵌套集合可能会很棘手.

Reading such nested collection from Parquet files can be tricky, though.

让我们以 spark-shell (1.3.1) 中的一个例子:

Let's take an example from the spark-shell (1.3.1):

scala> import sqlContext.implicits._
import sqlContext.implicits._

scala> case class Inner(a: String, b: String)
defined class Inner

scala> case class Outer(key: String, inners: Seq[Inner])
defined class Outer

写入镶木地板文件:

scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25

scala> outers.toDF.saveAsParquetFile("outers.parquet")

读取镶木地板文件:

scala> import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions.Row

scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>]   

scala> val outers = dataFrame.map { row =>
     |   val key = row.getString(0)
     |   val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
     |   Outer(key, inners)
     | }
outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848

重要的部分是row.getAs[Seq[Row]](1).struct 的嵌套序列的内部表示是 ArrayBuffer[Row],你可以使用它的任何超类型代替 Seq[Row]>.1 是外行的列索引.我在这里使用了 getAs 方法,但在最新版本的 Spark 中有替代方法.源码见行特征.

The important part is row.getAs[Seq[Row]](1). The internal representation of a nested sequence of struct is ArrayBuffer[Row], you could use any super-type of it instead of Seq[Row]. The 1 is the column index in the outer row. I used the method getAs here but there are alternatives in the latest versions of Spark. See the source code of the Row trait.

现在您有了 RDD[Outer],您可以应用任何想要的转换或操作.

Now that you have a RDD[Outer], you can apply any wanted transformation or action.

// Filter the outers
outers.filter(_.inners.nonEmpty)

// Filter the inners
outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))

请注意,我们仅使用 spark-SQL 库来读取 parquet 文件.例如,您可以在将 DataFrame 映射到 RDD 之前,直接在 DataFrame 上仅选择需要的列.

Note that we used the spark-SQL library only to read the parquet file. You could for example select only the wanted columns directly on the DataFrame, before mapping it to a RDD.

dataFrame.select('col1, 'col2).map { row => ... }

这篇关于如何在 Spark 中读取嵌套集合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆