如何在 Scala 的 Apache Spark 中将数据帧转换为数据集? [英] How to convert a dataframe to dataset in Apache Spark in Scala?

查看:36
本文介绍了如何在 Scala 的 Apache Spark 中将数据帧转换为数据集?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要将数据框转换为数据集,我使用了以下代码:

I need to convert my dataframe to a dataset and I used the following code:

    val final_df = Dataframe.withColumn(
      "features",
      toVec4(
        // casting into Timestamp to parse the string, and then into Int
        $"time_stamp_0".cast(TimestampType).cast(IntegerType),
        $"count",
        $"sender_ip_1",
        $"receiver_ip_2"
      )
    ).withColumn("label", (Dataframe("count"))).select("features", "label")

    final_df.show()

    val trainingTest = final_df.randomSplit(Array(0.3, 0.7))
    val TrainingDF = trainingTest(0)
    val TestingDF=trainingTest(1)
    TrainingDF.show()
    TestingDF.show()

    ///lets create our liner regression
    val lir= new LinearRegression()
    .setRegParam(0.3)
    .setElasticNetParam(0.8)
    .setMaxIter(100)
    .setTol(1E-6)

    case class df_ds(features:Vector, label:Integer)
    org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

    val Training_ds = TrainingDF.as[df_ds]

我的问题是,我收到以下错误:

Error:(96, 36) Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
    val Training_ds = TrainingDF.as[df_ds]

似乎数据框中的值数量与我班级中的值数量不同.但是,我在 TrainingDF 数据帧上使用了 case class df_ds(features:Vector, label:Integer),因为它有一个特征向量和一个整数标签.这是 TrainingDF 数据框:

It seems that the number of values in dataframe is different with the number of value in my class. However I am using case class df_ds(features:Vector, label:Integer) on my TrainingDF dataframe since, It has a vector of features and an integer label. Here is TrainingDF dataframe:

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,10...|   10|
+--------------------+-----+

这里也是我的原始 final_df 数据框:

Also here is my original final_df dataframe:

+------------+-----------+-------------+-----+
|time_stamp_0|sender_ip_1|receiver_ip_2|count|
+------------+-----------+-------------+-----+
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.3|     10.0.0.2|   10|
+------------+-----------+-------------+-----+

但是我遇到了提到的错误!有谁能够帮助我?提前致谢.

However I got the mentioned error! Can anybody help me? Thanks in advance.

推荐答案

您正在阅读的错误消息是一个很好的指针.

The error message you are reading is a pretty good pointer.

当您将 DataFrame 转换为 Dataset 时,对于 DataFrame<中存储的任何内容,您都必须有一个合适的 Encoder/code> 行.

When you convert a DataFrame to a Dataset you have to have a proper Encoder for whatever is stored in the DataFrame rows.

类原始类型(Ints、Strings 等)和 case 类 的编码器仅通过导入您的 SparkSession 隐含如下:

Encoders for primitive-like types (Ints, Strings, and so on) and case classes are provided by just importing the implicits for your SparkSession like follows:

case class MyData(intField: Int, boolField: Boolean) // e.g.

val spark: SparkSession = ???
val df: DataFrame = ???

import spark.implicits._

val ds: Dataset[MyData] = df.as[MyData]

如果这不起作用,要么是因为您尝试将 DataFrame 转换为 的类型不受支持.在这种情况下,您必须编写自己的 Encoder:您可以找到有关它的更多信息 此处 并查看示例(java.time.LocalDateTimeEncoder)这里.

If that doesn't work either is because the type you are trying to cast the DataFrame to isn't supported. In that case, you would have to write your own Encoder: you may find more information about it here and see an example (the Encoder for java.time.LocalDateTime) here.

这篇关于如何在 Scala 的 Apache Spark 中将数据帧转换为数据集?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆