Spark - Java UDF返回多列 [英] Spark - Java UDF returning multiple columns
问题描述
我正在使用sparkSql 1.6.2(Java API),我必须处理以下DataFrame,其中包含2列中的值列表:
ID AttributeName AttributeValue
0 [an1,an2,an3] [av1,av2,av3]
1 [bn1,bn2] [bv1,bv2]
所需的表是:
ID AttributeName AttributeValue
0 an1 av1
0 an2 av2
0 an3 av3
1 bn1 bv1
1 bn2 bv2
我想我必须结合使用explode函数和自定义UDF函数。
我找到了以下资源:
我可以成功运行一个示例来读取两列并返回列中前两个字符串的串联
UDF2 combineUDF = new UDF2< Seq< String> ;,Seq< ; String>,String>(){
public String call(final Seq< String> col1,final Seq< String> col2)抛出异常{
return col1.apply(0)+ col2.apply(0);
}
};
context.udf()。register(combineUDF,combineUDF,DataTypes.StringType);
问题是写一个返回两列的UDF签名(用Java编写)。
据我所知,我必须定义一个新的StructType,如下所示,并将其设置为返回类型,但到目前为止我没有设法使最终代码工作
StructType retSchema = new StructType(new StructField [] {
new StructField(@ AttName,DataTypes.StringType,true,Metadata.empty()) ,
new StructField(@ AttValue,DataTypes.StringType,true,Metadata.empty()),
}
);
context.udf()。register(combineUDF,combineUDF,retSchema);
我们非常感谢任何帮助。
更新:我正在尝试先实施zip(AttributeName,AttributeValue)所以我需要在sparkSql中应用标准的爆炸函数:
ID AttName_AttValue
0 [[an1,av1],[an1,av2],[an3,av3]]
1 [[bn1,bv1],[bn2,bv2]]
我构建了以下UDF:
UDF2 combineColumns = new UDF2< Seq< String> ;, Seq< String> ;, List< List< String>>>(){
public List< List< String>> call(final Seq< String> col1,final Seq< String> col2)抛出异常{
List< List< String>> zipped = new LinkedList<>();
for(int i = 0,listSize = col1.size(); i< listSize; i ++){
List< String> subRow = Arrays.asList(col1.apply(i),col2.apply(i));
zipped.add(subRow);
}
返回压缩;
}
};
但是当我运行代码时
myDF.select(callUDF(combineColumns,col(AttributeName),col(AttributeValue)))。show(10);
我收到以下错误消息:
scala.MatchError:[[an1,av1],[an1,av2],[an3,av3]](类java.util.LinkedList)
看起来组合已经正确执行但是返回类型不是Scala中的预期类型。
任何帮助?
最后我设法得到了我想要的结果,但可能没有以最有效的方式。
基本上是两步:
- 两个列表的邮编
- 爆炸行中的列表
第一步我定义了以下UDF函数
UDF2 concatItems = new UDF2< Seq< String> ;, Seq< String> ;, Seq< String>>(){
public Seq< String> call(final Seq< String> col1,final Seq< String> col2)throws Exception {
ArrayList zipped = new ArrayList();
for(int i = 0,listSize = col1.size(); i< listSize; i ++){
String subRow = col1.apply(i)+; + col2.apply(i);
zipped.add(subRow);
}
返回scala.collection.JavaConversions.asScalaBuffer(zipped);
}
};
缺少SparkSession的功能注册:
<$ 。p $ p>
sparkSession.udf()寄存器( concatItems,concatItems,DataTypes.StringType);
然后我使用以下代码调用它:
DataFrame df2 = df.select(col(ID),callUDF(concatItems,col(AttributeName),col(AttributeValue))。alias( AttName_AttValue));
在这个阶段,df2看起来像这样:
ID AttName_AttValue
0 [[an1,av1],[an1,av2],[an3,av3]]
1 [[bn1,bv1], [bn2,bv2]]
然后我调用以下lambda函数将列表爆炸成行: / p>
DataFrame df3 = df2.select(col(ID),explode(col(AttName_AttValue))。alias( AttName_AttValue_row));
在这个阶段,df3看起来像这样:
ID AttName_AttValue
0 [an1,av1]
0 [an1,av2]
0 [an3,av3]
1 [ bn1,bv1]
1 [bn2,bv2]
最后分割属性名称和将值转换为两个不同的列,我将DataFrame转换为JavaRDD以使用map函数:
JavaRDD df3RDD = df3。 toJavaRDD()。map(
(函数<行,行>)myRow - > {
String [] info = String.valueOf(myRow.get(1))。split(,) ;
返回RowFactory.create(myRow.get(0),info [0],info [1]);
})。cache();
如果有人有更好的解决方案,请随时发表评论。
我希望它有所帮助。
I'm using sparkSql 1.6.2 (Java API) and I have to process the following DataFrame that has a list of value in 2 columns:
ID AttributeName AttributeValue
0 [an1,an2,an3] [av1,av2,av3]
1 [bn1,bn2] [bv1,bv2]
The desired table is:
ID AttributeName AttributeValue
0 an1 av1
0 an2 av2
0 an3 av3
1 bn1 bv1
1 bn2 bv2
I think I have to use a combination of the explode function and a custom UDF function.
I found the following resources:
- Explode (transpose?) multiple columns in Spark SQL table
- How do I call a UDF on a Spark DataFrame using JAVA?
and I can successfully run an example that read the two columns and return the concatenation of the first two strings in a column
UDF2 combineUDF = new UDF2<Seq<String>, Seq<String>, String>() {
public String call(final Seq<String> col1, final Seq<String> col2) throws Exception {
return col1.apply(0) + col2.apply(0);
}
};
context.udf().register("combineUDF", combineUDF, DataTypes.StringType);
the problem is to write the signature of a UDF returning two columns (in Java). As far as I understand I must define a new StructType as the one shown below and set that as return type, but so far I didn't manage to have the final code working
StructType retSchema = new StructType(new StructField[]{
new StructField("@AttName", DataTypes.StringType, true, Metadata.empty()),
new StructField("@AttValue", DataTypes.StringType, true, Metadata.empty()),
}
);
context.udf().register("combineUDF", combineUDF, retSchema);
Any help will be really appreciated.
UPDATE: I'm trying to implement first the zip(AttributeName,AttributeValue) so then I will need just to apply the standard explode function in sparkSql:
ID AttName_AttValue
0 [[an1,av1],[an1,av2],[an3,av3]]
1 [[bn1,bv1],[bn2,bv2]]
I built the following UDF:
UDF2 combineColumns = new UDF2<Seq<String>, Seq<String>, List<List<String>>>() {
public List<List<String>> call(final Seq<String> col1, final Seq<String> col2) throws Exception {
List<List<String>> zipped = new LinkedList<>();
for (int i = 0, listSize = col1.size(); i < listSize; i++) {
List<String> subRow = Arrays.asList(col1.apply(i), col2.apply(i));
zipped.add(subRow);
}
return zipped;
}
};
But when I run the code
myDF.select(callUDF("combineColumns", col("AttributeName"), col("AttributeValue"))).show(10);
I got the following error message:
scala.MatchError: [[an1,av1],[an1,av2],[an3,av3]] (of class java.util.LinkedList)
and it looks like the combining has been performed correctly but then the return type is not the expected one in Scala.
Any Help?
Finally I managed to get the result I was looking for but probably not in the most efficient way.
Basically the are 2 step:
- Zip of the two list
- Explode of the list in rows
For the first step I defined the following UDF Function
UDF2 concatItems = new UDF2<Seq<String>, Seq<String>, Seq<String>>() {
public Seq<String> call(final Seq<String> col1, final Seq<String> col2) throws Exception {
ArrayList zipped = new ArrayList();
for (int i = 0, listSize = col1.size(); i < listSize; i++) {
String subRow = col1.apply(i) + ";" + col2.apply(i);
zipped.add(subRow);
}
return scala.collection.JavaConversions.asScalaBuffer(zipped);
}
};
Missing the function registration to SparkSession:
sparkSession.udf().register("concatItems",concatItems,DataTypes.StringType);
and then I called it with the following code:
DataFrame df2 = df.select(col("ID"), callUDF("concatItems", col("AttributeName"), col("AttributeValue")).alias("AttName_AttValue"));
At this stage the df2 looks like that:
ID AttName_AttValue
0 [[an1,av1],[an1,av2],[an3,av3]]
1 [[bn1,bv1],[bn2,bv2]]
Then I called the following lambda function for exploding the list into rows:
DataFrame df3 = df2.select(col("ID"),explode(col("AttName_AttValue")).alias("AttName_AttValue_row"));
At this stage the df3 looks like that:
ID AttName_AttValue
0 [an1,av1]
0 [an1,av2]
0 [an3,av3]
1 [bn1,bv1]
1 [bn2,bv2]
Finally to split the attribute name and value into two different columns, I converted the DataFrame into a JavaRDD in order to use the map function:
JavaRDD df3RDD = df3.toJavaRDD().map(
(Function<Row, Row>) myRow -> {
String[] info = String.valueOf(myRow.get(1)).split(",");
return RowFactory.create(myRow.get(0), info[0], info[1]);
}).cache();
If anybody has a better solution feel free to comment. I hope it helps.
这篇关于Spark - Java UDF返回多列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!