尝试将一个Blob转换为Spark中的多个列 [英] Trying to turn a blob into multiple columns in Spark

查看:73
本文介绍了尝试将一个Blob转换为Spark中的多个列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个序列化的Blob和一个将其转换为Java Map的函数. 我已经将该函数注册为UDF,并尝试在Spark SQL中使用它,如下所示:

I have a serialized blob and a function that converts it into a java Map. I have registered the function as a UDF and tried to use it in Spark SQL as follows:

sqlCtx.udf.register("blobToMap", Utils.blobToMap)
val df = sqlCtx.sql(""" SELECT mp['c1'] as c1, mp['c2'] as c2 FROM
                        (SELECT *, blobToMap(payload) AS mp FROM t1) a """)

我确实做到了,但是由于某种原因,非常重的blobToMap函数每行运行两次,实际上我提取了20个字段,并且每行运行了20次.我在>从Spark DataFrame中的单列 但是它们实际上是不可扩展的-我不想每次需要提取数据时都创建一个类.

I do succeed in doing it, but for some reason the very heavy blobToMap function runs twice for every row, and in reality I extract 20 fields and it runs 20 times for every row. I saw the suggestions in Derive multiple columns from a single column in a Spark DataFrame but they are really not scalable - I don't want to create a class for every time I need to extract data.

如何强制Spark做合理的事情? 我试图分为两个阶段.唯一有效的方法是缓存内部选择-但这不可行,因为它确实是一个很大的blob,而我只需要几十个字段即可.

How can I force Spark to do what's reasonable? I tried to separate to two stages. The only thing that worked was to cache the inner select - but that's not feasible either because it is really a big blob and I need only a few dozen fields from it.

推荐答案

我会回答自己,希望它能对任何人有帮助..因此,经过数十次实验,我得以强制使用spark来评估udf并将其转换为Map一次,而不是针对每个关键请求一遍又一遍地重新计算它,而是通过拆分查询并做一个丑陋的恶作剧-将其转换为RDD并返回到DataFrame:

I'll answer myself hoping it will help anyone.. so after dozens of experiments I was able to force spark to evaluate the udf and turn it into a Map once, instead of recalculating it over and over again for every key request, by splitting the query and doing an evil ugly trick - turning it ti RDD and back to DataFrame:

val df1 = sqlCtx.sql("SELECT *, blobToMap(payload) AS mp FROM t1")
sqlCtx.createDataFrame(df.rdd, df.schema).registerTempTable("t1_with_mp")
val final_df = sqlCtx.sql("SELECT mp['c1'] as c1, mp['c2'] as c2 FROM t1_with_mp")

这篇关于尝试将一个Blob转换为Spark中的多个列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆