如何读取星火嵌套集合 [英] How to read a nested collection in Spark

查看:163
本文介绍了如何读取星火嵌套集合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一列是一个木桌子

,阵列<&结构LT; COL1,COL2,科隆... >>

, array<struct<col1,col2,..colN>>

可以通过侧面的语法运行对这个表查询的蜂巢。

Can run queries against this table in Hive using LATERAL VIEW syntax.

如何阅读此表到RDD,更重要的是如何筛选,在星火地图等这集嵌套?

How to read this table into an RDD, and more importantly how to filter, map etc this nested collection in Spark?

找不到星火文档中本的任何引用。感谢您的任何信息!

Could not find any references to this in Spark documentation. Thanks in advance for any information!

PS。觉得可能是有益的给桌子上的一些统计数据。
 在主桌〜600列数。行数〜200米。
 列在嵌套的集数约10。嵌套收集记录平均数量〜35。

ps. Felt might be helpful to give some stats on the table. Number of columns in main table ~600. Number of rows ~200m. Number of "columns" in nested collection ~10. Avg number of records in nested collection ~35.

推荐答案

有在嵌套集合的情况下,没有魔法。星火将处理相同的方式 RDD [(字符串,字符串)] RDD [(字符串,SEQ [字符串])]

There is no magic in the case of nested collection. Spark will handle the same way a RDD[(String, String)] and a RDD[(String, Seq[String])].

从读取文件拼花等嵌套集合可能会非常棘手,但。

Reading such nested collection from Parquet files can be tricky, though.

让我们从火花壳为例(1.3.1):

Let's take an example from the spark-shell (1.3.1):

scala> import sqlContext.implicits._
import sqlContext.implicits._

scala> case class Inner(a: String, b: String)
defined class Inner

scala> case class Outer(key: String, inners: Seq[Inner])
defined class Outer

写拼花文件:

scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25

scala> outers.toDF.saveAsParquetFile("outers.parquet")

阅读拼花文件:

scala> import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions.Row

scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>]   

scala> val outers = dataFrame.map { row =>
     |   val key = row.getString(0)
     |   val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
     |   Outer(key, inners)
     | }
outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848

重要的部分是 row.getAs [序号[行](1)。的嵌套序列结构的内部重组presentation是 ArrayBuffer [行] ,你可以使用任何超它的型,而不是序列[行] 。在 1 是外行中的列索引。我用的方法 getAs 在这里,但也有在星火的最新版本的替代品。见的源头code <一个href=\"https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala#L118\">Row特质。

The important part is row.getAs[Seq[Row]](1). The internal representation of a nested sequence of struct is ArrayBuffer[Row], you could use any super-type of it instead of Seq[Row]. The 1 is the column index in the outer row. I used the method getAs here but there are alternatives in the latest versions of Spark. See the source code of the Row trait.

现在,你有一个 RDD [外] ,你可以将任何想改造或行动。

Now that you have a RDD[Outer], you can apply any wanted transformation or action.

// Filter the outers
outers.filter(_.inners.nonEmpty)

// Filter the inners
outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))

请注意,我们使用的火花SQL库​​只读取拼花文件。例如,你可以选择只在数据框通缉直接列,它映射到RDD之前。

Note that we used the spark-SQL library only to read the parquet file. You could for example select only the wanted columns directly on the DataFrame, before mapping it to a RDD.

dataFrame.select('col1, 'col2).map { row => ... }

这篇关于如何读取星火嵌套集合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆