如何将csv字符串转换为Spark-ML兼容的Dataset< Row>.格式? [英] How to transform a csv string into a Spark-ML compatible Dataset<Row> format?

查看:103
本文介绍了如何将csv字符串转换为Spark-ML兼容的Dataset< Row>.格式?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Dataset<Row> df,其中包含类型为string的两列(键"和值"). df.printSchema();给我以下输出:

I have a Dataset<Row> df, that contains two columns ("key" and "value") of type string. df.printSchema(); is giving me the following output:

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)

value列的内容实际上是一个csv格式的行(来自kafka主题),该行的最后一个条目代表类标签,而所有先前的条目则代表特征(数据集中未包含第一行) ):

The content of the value column is actually a csv formated line (coming from a kafka topic), with the last entry of that line representing the class label and all the previous entries beeing the features (first row not included in the dataset):

feature0,feature1,label
0.6720004294237854,-0.4033586564886893,0
0.6659082469383558,0.07688976580256132,0
0.8086502311695247,0.564354801275521,1

由于我想在此数据上训练分类器,因此我需要将此表示形式转换为包含所有特征值的密集型矢量行和包含标签值的double型列:

Since I would like to train a classifier on this data, I need to transform this representation into a row of type dense vector, containing all the feature values and a column of type double, containing the label value:

root
 |-- indexedFeatures: vector (nullable = false)
 |-- indexedLabel: double (nullable = false)

如何使用Java 1.8和Spark 2.2.0做到这一点?

How can I do this, using java 1.8 and Spark 2.2.0?

我走得更远,但是在尝试使其使用灵活的数量要素尺寸时,我再次陷入困境.我创建了一个后续问题.

I got further, but while attempting to make it work with a flexible amount feature dimensions, I got stuck again. I created a follow-up question.

推荐答案

A

A VectorAssembler (javadocs) can transform the dataset into the required format.

首先,将输入分为三列:

First, the input is split into three columns:

Dataset<FeaturesAndLabelData> featuresAndLabelData = inputDf.select("value").as(Encoders.STRING())
  .flatMap(s -> {
    String[] splitted = s.split(",");
    if (splitted.length == 3) {
      return Collections.singleton(new FeaturesAndLabelData(
        Double.parseDouble(splitted[0]),
        Double.parseDouble(splitted[1]), 
        Integer.parseInt(splitted[2]))).iterator();
    } else {
      // apply some error handling...
      return Collections.emptyIterator();
    }
  }, Encoders.bean(FeaturesAndLabelData.class));

然后通过VectorAssembler转换结果:

The result is then transformed by a VectorAssembler:

VectorAssembler assembler = new VectorAssembler()
  .setInputCols(new String[] { "feature1", "feature2" })
  .setOutputCol("indexedFeatures");
Dataset<Row> result = assembler.transform(featuresAndLabelData)
  .withColumn("indexedLabel", functions.col("label").cast("double"))
  .select("indexedFeatures", "indexedLabel");

结果数据框具有所需的格式:

The result dataframe has the required format:

+----------------------------------------+------------+
|indexedFeatures                         |indexedLabel|
+----------------------------------------+------------+
|[0.6720004294237854,-0.4033586564886893]|0.0         |
|[0.6659082469383558,0.07688976580256132]|0.0         |
|[0.8086502311695247,0.564354801275521]  |1.0         |
+----------------------------------------+------------+

root
 |-- indexedFeatures: vector (nullable = true)
 |-- indexedLabel: double (nullable = true)

FeaturesAndLabelData是一个简单的Java Bean,可确保列名正确:

FeaturesAndLabelData is a simple Java bean to make sure that the column names are correct:

public class FeaturesAndLabelData {
  private double feature1;
  private double feature2;
  private int label;

  //getters and setters...
}

这篇关于如何将csv字符串转换为Spark-ML兼容的Dataset&lt; Row&gt;.格式?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆