如何将 csv 字符串转换为 Spark-ML 兼容的数据集<Row>格式? [英] How to transform a csv string into a Spark-ML compatible Dataset<Row> format?
问题描述
我有一个 Dataset
,包含 string
类型的两列(key"和value").df.printSchema();给我以下输出:
I have a Dataset<Row> df
, that contains two columns ("key" and "value") of type string
. df.printSchema(); is giving me the following output:
root
|-- key: string (nullable = true)
|-- value: string (nullable = true)
value 列的内容实际上是一个 csv 格式的行(来自 kafka 主题),该行的最后一个条目代表类标签,所有之前的条目代表特征(第一行不包含在数据集中):
The content of the value column is actually a csv formated line (coming from a kafka topic), with the last entry of that line representing the class label and all the previous entries beeing the features (first row not included in the dataset):
feature0,feature1,label
0.6720004294237854,-0.4033586564886893,0
0.6659082469383558,0.07688976580256132,0
0.8086502311695247,0.564354801275521,1
因为我想在这个数据上训练一个分类器,所以我需要把这个表示转换成一行密集向量,包含所有的特征值和一个 double 类型的列,包含标签值:
Since I would like to train a classifier on this data, I need to transform this representation into a row of type dense vector, containing all the feature values and a column of type double, containing the label value:
root
|-- indexedFeatures: vector (nullable = false)
|-- indexedLabel: double (nullable = false)
如何使用 java 1.8 和 Spark 2.2.0 执行此操作?
How can I do this, using java 1.8 and Spark 2.2.0?
我走得更远,但在尝试使用灵活数量的特征尺寸时,我再次陷入困境.我创建了一个 后续问题.
I got further, but while attempting to make it work with a flexible amount feature dimensions, I got stuck again. I created a follow-up question.
推荐答案
A VectorAssembler (javadocs) 可以将数据集转换为所需的格式.
A VectorAssembler (javadocs) can transform the dataset into the required format.
首先,输入分为三列:
Dataset<FeaturesAndLabelData> featuresAndLabelData = inputDf.select("value").as(Encoders.STRING())
.flatMap(s -> {
String[] splitted = s.split(",");
if (splitted.length == 3) {
return Collections.singleton(new FeaturesAndLabelData(
Double.parseDouble(splitted[0]),
Double.parseDouble(splitted[1]),
Integer.parseInt(splitted[2]))).iterator();
} else {
// apply some error handling...
return Collections.emptyIterator();
}
}, Encoders.bean(FeaturesAndLabelData.class));
然后由 VectorAssembler 转换结果:
The result is then transformed by a VectorAssembler:
VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[] { "feature1", "feature2" })
.setOutputCol("indexedFeatures");
Dataset<Row> result = assembler.transform(featuresAndLabelData)
.withColumn("indexedLabel", functions.col("label").cast("double"))
.select("indexedFeatures", "indexedLabel");
结果数据框具有所需的格式:
The result dataframe has the required format:
+----------------------------------------+------------+
|indexedFeatures |indexedLabel|
+----------------------------------------+------------+
|[0.6720004294237854,-0.4033586564886893]|0.0 |
|[0.6659082469383558,0.07688976580256132]|0.0 |
|[0.8086502311695247,0.564354801275521] |1.0 |
+----------------------------------------+------------+
root
|-- indexedFeatures: vector (nullable = true)
|-- indexedLabel: double (nullable = true)
FeaturesAndLabelData 是一个简单的 Java bean,用于确保列名正确:
FeaturesAndLabelData is a simple Java bean to make sure that the column names are correct:
public class FeaturesAndLabelData {
private double feature1;
private double feature2;
private int label;
//getters and setters...
}
这篇关于如何将 csv 字符串转换为 Spark-ML 兼容的数据集<Row>格式?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!