如何使用SparkR保持数据安全? [英] How to unnest data with SparkR?
问题描述
使用 SparkR
嵌套数组如何爆炸?我尝试过使用爆炸
,例如:
dat< - nested_spark_df%>%
mutate(a = explode(元数据))%>%
head()
尽管上述不会引发异常,但它不会将元数据
中的嵌套字段提升到顶层。基本上,我正在寻找与Hive的 LATERAL VIEW explode()
功能类似的行为,而不依赖于 HiveContext
。
请注意,在代码片段中,我使用通过 SparkRext
启用的NSE。我认为等价的直线 - SparkR
可能类似于 ...%>%mutate(a = explode(nested_spark_df $ metadata))。
编辑
我试过使用 LATERAL VIEW在
。它似乎与Parquet和ORC数据很好地合作。然而,当处理嵌套的Avro数据时,我试过: SparkR :: sql
函数中爆炸(...)
$ $ p $ dat < - collect(sql(HiveContext,
paste0 SELECT a.id,ax.arrival_airport,x.arrival_runway,
FROM avrodb.flight a,
LATERAL VIEW爆炸(a.metadata)a ax ax,
WHERE ax.arrival_airport ='ATL')))
只有得到以下错误,换出 avrodb
与 parquetdb
包含等价数据,它就是我所期望的。
invokeJava错误(isStatic = TRUE,className,methodName,...):
org.apache.spark.SparkException:由于阶段失败而中止作业:阶段5.0中的任务4失败4次,最近一次失败:阶段5.0中丢失的任务4.3(TID 1345,dev-dn04.myorg.org):org.apache.avro.AvroTypeException:找到元数据,期望联盟
在org.apache.avro.io.ResolvingDecoder.doActi on(ResolvingDecoder.java:292)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex( (org.apache.avro.generic.GenericDatumReader.readArray)(GenericDatumReader.java:ResolveDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java: 219)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at org .apache.avr
通话:<匿名> ...收集 - >收集 - > .local - > callJStatic - > invokeJava
尽管事实上我在启动Spark时包含了DataBricks Avro包。使用 SQLContext
(而不是 HiveContext
)与spark读取相同的数据,除了我没有能够弄清楚如何有效地使用 explode()
函数。我也证实,这不是数据本身的问题,通过使用相同的HQL语句通过Hive成功查询相同的文件,我试着用 SparkR :: sql(HiveContext,hql)
非常感谢@Sim。尽管我终于找到了一个理智的方法。关键在于,当所有分解值仍然嵌套在一个深度级别时,必须执行 select 选择
,在爆炸
。例如:
dat< - nested_spark_df%>%
mutate(a = explode(nested_spark_df $ metadata) )%>%
select(id,a.fld1,a.fld2)
这将导致带有3列的 SparkR DataFrame
对象: id
, fld1
和 fld2
(no a。
prepended)。
我的心理障碍是我试图使爆炸行为像PIG的 flatten
,它会创建一堆新的字段名称在模式的顶层。
Using SparkR
how can nested arrays be "exploded along"? I've tried using explode
like so:
dat <- nested_spark_df %>%
mutate(a=explode(metadata)) %>%
head()
but though the above does not cause an exception to be thrown, it does not promote the nested fields in metadata
to the top level. Essentially I'm seeking behavior similar to that of Hive's LATERAL VIEW explode()
functionality without relying on a HiveContext
.
Note that in the code snippet I'm using the NSE enabled via SparkRext
. I think the equivalent straight-SparkR
would be something like ... %>% mutate(a=explode(nested_spark_df$metadata)) ...
or something along those lines.
EDIT
I've tried using LATERAL VIEW explode(...)
in the SparkR::sql
function. It seems to work great with Parquet and ORC data. However when working with nested Avro data I tried:
dat <- collect(sql(HiveContext,
paste0("SELECT a.id, ax.arrival_airport, x.arrival_runway ",
"FROM avrodb.flight a ",
"LATERAL VIEW explode(a.metadata) a AS ax ",
"WHERE ax.arrival_airport='ATL'")))
Only to get the following error, though when a swap out avrodb
with parquetdb
containing equivalent data it does what I expect.
Error in invokeJava(isStatic = TRUE, className, methodName, ...) :
org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent failure: Lost task 4.3 in stage 5.0 (TID 1345, dev-dn04.myorg.org): org.apache.avro.AvroTypeException: Found metadata, expecting union
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
at org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:219)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at org.apache.avr
Calls: <Anonymous> ... collect -> collect -> .local -> callJStatic -> invokeJava
This despite the fact that I included the DataBricks Avro package when starting Spark. Reading the same data with spark using a SQLContext
(instead of the HiveContext
) works fine except that I haven't been able to figure out how to effectively use the explode()
function. I've also confirmed that this is not an issue with the data itself by successfully querying the same files via Hive using the same HQL statement I tried running with SparkR::sql(HiveContext, hql)
Thanks much to @Sim. I finally figured out a sane approach though. The key is that after the explode
operation when all the exploded values are still nested one level deep a select
must be performed. For example:
dat <- nested_spark_df %>%
mutate(a=explode(nested_spark_df$metadata)) %>%
select("id", "a.fld1", "a.fld2")
which will result in a SparkR DataFrame
object with 3 columns: id
, fld1
, and fld2
(no a.
prepended).
My mental block was that I was trying to get explode to act like PIG's flatten
where it would create a bunch of new field names at the top level of the schema.
这篇关于如何使用SparkR保持数据安全?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!