如何在Scala的Apache Spark中将数据框转换为数据集? [英] How to convert a dataframe to dataset in Apache Spark in Scala?

查看:108
本文介绍了如何在Scala的Apache Spark中将数据框转换为数据集?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要将数据框转换为数据集,并使用以下代码:

I need to convert my dataframe to a dataset and I used the following code:

    val final_df = Dataframe.withColumn(
      "features",
      toVec4(
        // casting into Timestamp to parse the string, and then into Int
        $"time_stamp_0".cast(TimestampType).cast(IntegerType),
        $"count",
        $"sender_ip_1",
        $"receiver_ip_2"
      )
    ).withColumn("label", (Dataframe("count"))).select("features", "label")

    final_df.show()

    val trainingTest = final_df.randomSplit(Array(0.3, 0.7))
    val TrainingDF = trainingTest(0)
    val TestingDF=trainingTest(1)
    TrainingDF.show()
    TestingDF.show()

    ///lets create our liner regression
    val lir= new LinearRegression()
    .setRegParam(0.3)
    .setElasticNetParam(0.8)
    .setMaxIter(100)
    .setTol(1E-6)

    case class df_ds(features:Vector, label:Integer)
    org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

    val Training_ds = TrainingDF.as[df_ds]

我的问题是,我遇到了以下错误:

Error:(96, 36) Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
    val Training_ds = TrainingDF.as[df_ds]

似乎数据框中的值数量与我的班级中的值数量不同.但是,由于我在TrainingDF数据帧上使用case class df_ds(features:Vector, label:Integer),因为它具有特征向量和整数标签.这是TrainingDF数据框:

It seems that the number of values in dataframe is different with the number of value in my class. However I am using case class df_ds(features:Vector, label:Integer) on my TrainingDF dataframe since, It has a vector of features and an integer label. Here is TrainingDF dataframe:

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,19...|   19|
|[1.497325796E9,10...|   10|
+--------------------+-----+

这也是我原来的 final_df 数据框:

Also here is my original final_df dataframe:

+------------+-----------+-------------+-----+
|time_stamp_0|sender_ip_1|receiver_ip_2|count|
+------------+-----------+-------------+-----+
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.2|     10.0.0.3|   19|
|    05:49:56|   10.0.0.3|     10.0.0.2|   10|
+------------+-----------+-------------+-----+

但是我遇到了提到的错误!有谁能够帮我? 提前致谢.

However I got the mentioned error! Can anybody help me? Thanks in advance.

推荐答案

您正在阅读的错误消息是一个很好的指针.

The error message you are reading is a pretty good pointer.

DataFrame转换为Dataset时,对于DataFrame行中存储的任何内容,都必须具有正确的Encoder.

When you convert a DataFrame to a Dataset you have to have a proper Encoder for whatever is stored in the DataFrame rows.

类似原始类型(Int s,String s等)和case classes的编码器是通过为您的SparkSession导入隐式来提供的,如下所示:

Encoders for primitive-like types (Ints, Strings, and so on) and case classes are provided by just importing the implicits for your SparkSession like follows:

case class MyData(intField: Int, boolField: Boolean) // e.g.

val spark: SparkSession = ???
val df: DataFrame = ???

import spark.implicits._

val ds: Dataset[MyData] = df.as[MyData]

如果这也不起作用,是因为不支持您尝试将DataFrame投射到 的类型.在这种情况下,您将必须编写自己的Encoder:您可能会找到有关它的更多信息此处,然后查看示例(java.time.LocalDateTimeEncoder)

If that doesn't work either is because the type you are trying to cast the DataFrame to isn't supported. In that case, you would have to write your own Encoder: you may find more information about it here and see an example (the Encoder for java.time.LocalDateTime) here.

这篇关于如何在Scala的Apache Spark中将数据框转换为数据集?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆