如何使用Spark UDF返回复杂类型 [英] How to return complex types using spark UDFs

查看:590
本文介绍了如何使用Spark UDF返回复杂类型的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

你好,谢谢你.

我的程序是用Java编写的,我无法移至Scala.

My program is written in java and i can not move to scala.

我目前正在使用以下行从json文件提取的spark DataFrame:

I am currently working with a spark DataFrame extracted from a json file using the following line:

DataFrame dff = sqlContext.read().json("filePath.son");

SQLContext和SparkContext已正确初始化并完美运行.

SQLContext and SparkContext are correctly initialzied and running perfectly.

问题是我正在读取的json具有嵌套结构,我想在不更改架构的情况下清除/验证内部数据.

The problem is the json i'm reading from has nested structs, and I want to clean/verify the inner data, without changing the schema.

数据框的其中一列尤其具有"GenericRowWithSchema"类型.

One of the dataframe's columns in particular has "GenericRowWithSchema" type.

比方说,我只想清除名为数据"的那一列.

Let's say I want to clean that only column, named "data".

我想到的解决方案是定义一个名为"cleanDataField"的用户定义函数(UDF),然后在数据"列上运行它.这是代码:

The solution that came to my mind was to define a User Defined Function (UDF) named "cleanDataField" and then run it over the column "data". Here's the code:

UDF1<GenericRowWithSchema,GenericRowWithSchema> cleanDataField = new UDF1<GenericRowWithSchema, GenericRowWithSchema>(){

        public GenericRowWithSchema call( GenericRowWithSchema grws){

            cleanGenericRowWithSchema(grws);

            return grws;

        }
    };

然后我将在SQLContext中注册该函数:

Then i would register the function in the SQLContext:

sqlContext.udf().register("cleanDataField", cleanDataField, DataTypes.StringType);

然后我会打电话给

df.selectExpr("cleanDataField(data)").show(10, false);

为了查看带有干净数据的前10行.

In order to see the first 10 rows with the clean data.

最后,这个问题导致:我可以返回复杂的数据(例如自定义类对象)吗? 如果可能的话,我应该怎么做?我想我必须更改udf注册的第3个参数,因为我没有返回字符串,但是我应该用它代替什么呢?

In the end, the question results in this: Can i return complex data (such as a custom class object)? And if it is possible, how should i do it? I guess I have to change the udf registration's 3rd parameter because i'm not returning a string, but what should i replace it for?

谢谢

推荐答案

假设您要将数据类型构造为struct<companyid:string,loyaltynum:int,totalprice:int,itemcount:int>

Let's say you want to construct a datatype as struct<companyid:string,loyaltynum:int,totalprice:int,itemcount:int>

为此,您可以执行以下操作:

For this you can do the following:

    // I am just copying the json string as is but you will need to escape it properly for java.

DataType dt = DataType.fromJson({"type":"struct","fields":[{"name":"companyid","type":"string","nullable":false,"metadata":{}},{"name":"loyaltynum","type":"integer","nullable":false,"metadata":{}},{"name":"totalprice","type":"integer","nullable":false,"metadata":{}},{"name":"itemcount","type":"integer","nullable":false,"metadata":{}}]})

然后您可以在注册UDF时使用该数据类型作为返回类型.

You can then use that data type as return type while registering your UDF.

这篇关于如何使用Spark UDF返回复杂类型的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆