如何使用SparkR保持数据安全? [英] How to unnest data with SparkR?

查看:159
本文介绍了如何使用SparkR保持数据安全?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用 SparkR 嵌套数组如何爆炸?我尝试过使用爆炸,例如:

  dat<  - nested_spark_df%>%
mutate(a = explode(元数据))%>%
head()

尽管上述不会引发异常,但它不会将元数据中的嵌套字段提升到顶层。基本上,我正在寻找与Hive的 LATERAL VIEW explode()功能类似的行为,而不依赖于 HiveContext



请注意,在代码片段中,我使用通过 SparkRext 启用的NSE。我认为等价的直线 - SparkR 可能类似于 ...%>%mutate(a = explode(nested_spark_df $ metadata))。



编辑



我试过使用 LATERAL VIEW在 SparkR :: sql 函数中爆炸(...)。它似乎与Parquet和ORC数据很好地合作。然而,当处理嵌套的Avro数据时,我试过:

$ $ p $ dat < - collect(sql(HiveContext,
paste0 SELECT a.id,ax.arrival_airport,x.arrival_runway,
FROM avrodb.flight a,
LATERAL VIEW爆炸(a.metadata)a ax ax,
WHERE ax.arrival_airport ='ATL')))

只有得到以下错误,换出 avrodb parquetdb 包含等价数据,它就是我所期望的。

  invokeJava错误(isStatic = TRUE,className,methodName,...):
org.apache.spark.SparkException:由于阶段失败而中止作业:阶段5.0中的任务4失败4次,最近一次失败:阶段5.0中丢失的任务4.3(TID 1345,dev-dn04.myorg.org):org.apache.avro.AvroTypeException:找到元数据,期望联盟
在org.apache.avro.io.ResolvingDecoder.doActi on(ResolvingDecoder.java:292)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex( (org.apache.avro.generic.GenericDatumReader.readArray)(GenericDatumReader.java:ResolveDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java: 219)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at org .apache.avr
通话:<匿名> ...收集 - >收集 - > .local - > callJStatic - > invokeJava

尽管事实上我在启动Spark时包含了DataBricks Avro包。使用 SQLContext (而不是 HiveContext )与spark读取相同的数据,除了我没有能够弄清楚如何有效地使用 explode()函数。我也证实,这不是数据本身的问题,通过使用相同的HQL语句通过Hive成功查询相同的文件,我试着用 SparkR :: sql(HiveContext,hql)

解决方案

非常感谢@Sim。尽管我终于找到了一个理智的方法。关键在于,当所有分解值仍然嵌套在一个深度级别时,必须执行 select 选择,在爆炸 。例如:

  dat<  -  nested_spark_df%>%
mutate(a = explode(nested_spark_df $ metadata) )%>%
select(id,a.fld1,a.fld2)

这将导致带有3列的 SparkR DataFrame 对象: id fld1 fld2 (no a。 prepended)。



我的心理障碍是我试图使爆炸行为像PIG的 flatten ,它会创建一堆新的字段名称在模式的顶层。


Using SparkR how can nested arrays be "exploded along"? I've tried using explode like so:

 dat <- nested_spark_df %>% 
     mutate(a=explode(metadata)) %>%
     head()

but though the above does not cause an exception to be thrown, it does not promote the nested fields in metadata to the top level. Essentially I'm seeking behavior similar to that of Hive's LATERAL VIEW explode() functionality without relying on a HiveContext.

Note that in the code snippet I'm using the NSE enabled via SparkRext. I think the equivalent straight-SparkR would be something like ... %>% mutate(a=explode(nested_spark_df$metadata)) ... or something along those lines.

EDIT

I've tried using LATERAL VIEW explode(...) in the SparkR::sql function. It seems to work great with Parquet and ORC data. However when working with nested Avro data I tried:

dat <- collect(sql(HiveContext,
                   paste0("SELECT a.id, ax.arrival_airport, x.arrival_runway ",
                          "FROM avrodb.flight a ",  
                             "LATERAL VIEW explode(a.metadata) a AS ax ",
                          "WHERE ax.arrival_airport='ATL'")))

Only to get the following error, though when a swap out avrodb with parquetdb containing equivalent data it does what I expect.

Error in invokeJava(isStatic = TRUE, className, methodName, ...) :
  org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 1345, dev-dn04.myorg.org): org.apache.avro.AvroTypeException: Found metadata, expecting union
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
    at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
    at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:219)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
    at org.apache.avr
Calls: <Anonymous> ... collect -> collect -> .local -> callJStatic -> invokeJava

This despite the fact that I included the DataBricks Avro package when starting Spark. Reading the same data with spark using a SQLContext (instead of the HiveContext) works fine except that I haven't been able to figure out how to effectively use the explode() function. I've also confirmed that this is not an issue with the data itself by successfully querying the same files via Hive using the same HQL statement I tried running with SparkR::sql(HiveContext, hql)

解决方案

Thanks much to @Sim. I finally figured out a sane approach though. The key is that after the explode operation when all the exploded values are still nested one level deep a select must be performed. For example:

dat <- nested_spark_df %>% 
 mutate(a=explode(nested_spark_df$metadata)) %>%
 select("id", "a.fld1", "a.fld2")

which will result in a SparkR DataFrame object with 3 columns: id, fld1, and fld2 (no a. prepended).

My mental block was that I was trying to get explode to act like PIG's flatten where it would create a bunch of new field names at the top level of the schema.

这篇关于如何使用SparkR保持数据安全?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆