如何使用 Java UDF 向 Spark 数据框添加新列 [英] How to add new column to Spark dataframe using a Java UDF

查看:32
本文介绍了如何使用 Java UDF 向 Spark 数据框添加新列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 DatasetinputDS 有 4 列,即 Id, List时间,列表<字符串>value,aggregateType 我想使用 map 函数向 Dataset value_new 添加一列,该 map 函数需要列 timevalueaggregateType 将其传递给函数 getAggregate(String aggregateType, List time, List value) 并返回一个双精度值处理参数的值.getAggregate 方法返回的 Double 值将是新的列值,即 value_new

I have a Dataset<Row> inputDS which has 4 columns namely Id, List<long> time, List<String> value, aggregateType I want to add one more column to the Dataset value_new using map function, that map function takes columns time , value and aggregateType passes that to a function getAggregate(String aggregateType, List<long> time, List<String> value) and return a double value on processing the parameters. The Double value returned by the method getAggregate will be the new column value i.e value of value_new

数据集输入DS

 +------+---+-----------+---------------------------------------------+---------------+
 |    Id| value         |     time                                   |aggregateType  |
 +------+---------------+---------------------------------------------+---------------+
 |0001  |  [1.5,3.4,4.5]| [1551502200000,1551502200000,1551502200000] | Sum           |
 +------+---------------+---------------------------------------------+---------------+

预期数据集输出DS

 +------+---------------+---------------------------------------------+---------------+-----------+
 |    Id| value         |     time                                    |aggregateType  | value_new |
 +------+---------------+---------------------------------------------+---------------+-----------+
 |0001  |  [1.5,3.4,4.5]| [1551502200000,1551502200000,1551502200000] | Sum           |   9.4     |
 +------+---------------+---------------------------------------------+---------------+-----------+

我尝试过的代码.

 inputDS.withColumn("value_new",functions.lit(inputDS.map(new MapFunction<Row,Double>(){

 public double call(Row row){
 String aggregateType = row.getAS("aggregateType");
 List<long> timeList = row.getList("time");
 List<long> valueList= row.getList("value");  

 return  getAggregate(aggregateType ,timeList,valueList);    

 }}),Encoders.DOUBLE())));

错误

 Unsupported literal type class org.apache.spark.sql.Dataset [value:double]

注意对不起,如果我错误地使用了map函数,如果有任何解决方法,请建议我.

Note Sorry if I used map function wrongly and please suggest me if there is any workaround.

谢谢.

推荐答案

您收到错误,因为您正在尝试使用 Dataset 的结果创建函数文字 (lit()).map(),你可以在文档中看到它是一个数据集.您可以在 Dataset.withColumn() 的 API 中看到您需要一个作为列的参数.

You get the error because you are trying to create a function literal (lit()) using the result of Dataset.map(), which you can see in docs is a Dataset. You can see in the API for Dataset.withColumn() that you need a argument that is a column.

看来您需要创建一个用户定义的函数.看看如何调用使用 JAVA 的 Spark DataFrame 上的 UDF?

It seems like you need to create a user-defined function. Take a look at How do I call a UDF on a Spark DataFrame using JAVA?

这篇关于如何使用 Java UDF 向 Spark 数据框添加新列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆