使用设置为类型值的文字值向DataFrame添加新列 [英] Adding a new column to a DataFrame with a literal value of type set

查看:50
本文介绍了使用设置为类型值的文字值向DataFrame添加新列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Map<File, Dataset<Row> allWords = ...
StructField[] structFields = new StructField[] {
        new StructField("word", DataTypes.StringType, false, Metadata.empty()),
        new StructField("count", DataTypes.IntegerType, false, Metadata.empty()),
        new StructField("files", ???, false, Metadata.empty())
};
StructType structType = new StructType(structFields);

Dataset<Row> allFilesWords = spark.createDataFrame(new ArrayList<>(), structType);

for (Map.Entry<File, Dataset<Row>> entry : allWords.entrySet()) {
    Integer fileIndex = files.indexOf(entry.getKey());
    allFilesWords.unionAll(
            allWords.get(entry.getKey()).withColumn("files", ???)
    );
}

在上面给定的代码中, allWords 表示从文件到其字数的映射( Row :(字符串,整数)).现在,我想将所有文件的结果汇总到一个DataFrame中,同时保留提到该单词的原始文件.由于最后,每个单词可能已在多个文件中提及,因此 files 列设计为整数类型集(假设文件映射为整数).现在,我试图在 allWords 数据帧中添加一个新列,然后使用 unionAll 将它们全部合并在一起.

In the given code above, the allWords represent a mapping from a file to its word count (Row: (string, integer)). Now, I want to aggregate the result for all files into one DataFrame while keeping the original file that word was mentioned in. Since in the end, each word might have been mentioned in multiple files, the files column is designed of the type set of integers (assuming files are mapped into integers). Now, I'm trying to add a new column to the allWords DataFrames and then use the unionAll to merge them all together.

但是我不知道如何使用仅包含一项 fileIndex 的集合来定义和初始化新列(在此命名为 files ).

But I don't know how to define and initialize the new column (named files here) with a set holding only one item fileIndex.

由于评论中提供了链接,我知道我应该使用 functions.typedLit ,但是此函数需要第二个参数,但我不知道要提供什么.另外,我不知道如何定义列.最后一件事,提供的链接在Python中,而我正在寻找Java API.

Thanks to the link provided in the comments, I know I should be using functions.typedLit but this function asks for a second parameter which I don't know what to provide for it. Also, I don't know how to define the column. One last thing, the provided link is in Python and I'm looking for the Java API.

推荐答案

我自己(在一些帮助下)找到了解决方案:

I've found the solution myself (with some help):

Map<File, Dataset<Row> allWords = ...
StructField[] structFields = new StructField[] {
        new StructField("word", DataTypes.StringType, false, Metadata.empty()),
        new StructField("count", DataTypes.IntegerType, false, Metadata.empty()),
        new StructField("files", DataTypes.createArrayType(DataTypes.IntegerType), true, Metadata.empty())
};
StructType structType = new StructType(structFields);

Dataset<Row> allFilesWords = spark.createDataFrame(new ArrayList<>(), structType);
for (Map.Entry<File, Dataset<Row>> entry : allWords.entrySet()) {
    Integer fileIndex = files.indexOf(entry.getKey());
    allFilesWords.unionAll(
            allWords.get(entry.getKey())
                    .withColumn("files", functions.typedLit(seq, MyTypeTags.SeqInteger()))
    );
}

问题是 TypeTag 是Scala的编译时工件,基于我在这篇文章.

The problem was that TypeTag is a compile time artifact from Scala and based on what I've got in this other question, it needs to be generated by Scala compiler and there's no way you can generate one in Java. So, I had to compose my custom data structure's TypeTag in a Scala file and add it to my Maven Java project. For that, I followed this article.

这是我的 MyTypeTags.scala 文件:

import scala.reflect.runtime.universe._

object MyTypeTags {
  val SeqInteger = typeTag[Seq[Integer]]
}

这篇关于使用设置为类型值的文字值向DataFrame添加新列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆