如何使用 SparkR 取消嵌套数据? [英] How to unnest data with SparkR?

查看:10
本文介绍了如何使用 SparkR 取消嵌套数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用 SparkR 嵌套数组如何爆炸"?我试过像这样使用 explode :

Using SparkR how can nested arrays be "exploded along"? I've tried using explode like so:

 dat <- nested_spark_df %>% 
     mutate(a=explode(metadata)) %>%
     head()

但是虽然上面没有导致抛出异常,但它并没有将metadata中的嵌套字段提升到顶级.本质上,我正在寻求与 Hive 的 LATERAL VIEW purge() 功能类似的行为,而不依赖于 HiveContext.

but though the above does not cause an exception to be thrown, it does not promote the nested fields in metadata to the top level. Essentially I'm seeking behavior similar to that of Hive's LATERAL VIEW explode() functionality without relying on a HiveContext.

请注意,在代码片段中,我使用的是通过 SparkRext 启用的 NSE.我认为等效的直接 SparkR 将类似于 ... %>% mutate(a=explode(nested_spark_df$metadata)) ... 或类似的东西线.

Note that in the code snippet I'm using the NSE enabled via SparkRext. I think the equivalent straight-SparkR would be something like ... %>% mutate(a=explode(nested_spark_df$metadata)) ... or something along those lines.

我尝试在 SparkR::sql 函数中使用 LATERAL VIEW purge(...).它似乎适用于 Parquet 和 ORC 数据.但是,当我尝试使用嵌套的 Avro 数据时:

I've tried using LATERAL VIEW explode(...) in the SparkR::sql function. It seems to work great with Parquet and ORC data. However when working with nested Avro data I tried:

dat <- collect(sql(HiveContext,
                   paste0("SELECT a.id, ax.arrival_airport, x.arrival_runway ",
                          "FROM avrodb.flight a ",  
                             "LATERAL VIEW explode(a.metadata) a AS ax ",
                          "WHERE ax.arrival_airport='ATL'")))

只是为了得到以下错误,但是当用 parquetdb 交换包含等效数据的 avrodb 时,它会做我所期望的.

Only to get the following error, though when a swap out avrodb with parquetdb containing equivalent data it does what I expect.

Error in invokeJava(isStatic = TRUE, className, methodName, ...) :
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 1345, dev-dn04.myorg.org): org.apache.avro.AvroTypeException: Found metadata, expecting union
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
    at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
    at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:219)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
    at org.apache.avr
Calls: <Anonymous> ... collect -> collect -> .local -> callJStatic -> invokeJava

尽管我在启动 Spark 时包含了 DataBricks Avro 包,但仍然如此.使用 SQLContext(而不是 HiveContext)使用 spark 读取相同的数据效果很好,只是我无法弄清楚如何有效地使用 爆炸() 函数.我还通过使用与 SparkR::sql(HiveContext, hql) 运行的相同 HQL 语句通过 Hive 成功查询相同的文件,确认这不是数据本身的问题>

This despite the fact that I included the DataBricks Avro package when starting Spark. Reading the same data with spark using a SQLContext (instead of the HiveContext) works fine except that I haven't been able to figure out how to effectively use the explode() function. I've also confirmed that this is not an issue with the data itself by successfully querying the same files via Hive using the same HQL statement I tried running with SparkR::sql(HiveContext, hql)

推荐答案

非常感谢 @Sim.不过,我终于想出了一个理智的方法.关键是在explode操作之后,当所有explode值仍然嵌套一层深度时,必须执行select.例如:

Thanks much to @Sim. I finally figured out a sane approach though. The key is that after the explode operation when all the exploded values are still nested one level deep a select must be performed. For example:

dat <- nested_spark_df %>% 
 mutate(a=explode(nested_spark_df$metadata)) %>%
 select("id", "a.fld1", "a.fld2")

这将产生一个具有 3 列的 SparkR DataFrame 对象:idfld1fld2(没有 a. 前置).

which will result in a SparkR DataFrame object with 3 columns: id, fld1, and fld2 (no a. prepended).

我的心理障碍是,我试图让爆炸像 PIG 的 flatten 一样,它会在架构的顶层创建一堆新的字段名称.

My mental block was that I was trying to get explode to act like PIG's flatten where it would create a bunch of new field names at the top level of the schema.

这篇关于如何使用 SparkR 取消嵌套数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆