无法使用Scala在Apache Spark中执行用户定义的函数 [英] Failed to execute user defined function in Apache Spark using Scala

查看:54
本文介绍了无法使用Scala在Apache Spark中执行用户定义的函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下数据框:

+---------------+-----------+-------------+--------+--------+--------+--------+------+-----+
|   time_stamp_0|sender_ip_1|receiver_ip_2|s_port_3|r_port_4|acknum_5|winnum_6| len_7|count|
+---------------+-----------+-------------+--------+--------+--------+--------+------+-----+
|06:36:16.293711|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58| 65161|  130|
|06:36:16.293729|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58| 65913|  130|
|06:36:16.293743|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|131073|  130|
|06:36:16.293765|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|196233|  130|
|06:36:16.293783|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|196985|  130|
|06:36:16.293798|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|262145|  130|
|06:36:16.293820|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|327305|  130|
|06:36:16.293837|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|328057|  130|
|06:36:16.293851|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|393217|  130|
|06:36:16.293873|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|458377|  130|
|06:36:16.293890|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|459129|  130|
|06:36:16.293904|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|524289|  130|
|06:36:16.293926|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|589449|  130|
|06:36:16.293942|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|590201|  130|
|06:36:16.293956|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|655361|  130|
|06:36:16.293977|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|720521|  130|
|06:36:16.293994|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|721273|  130|
|06:36:16.294007|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|786433|  130|
|06:36:16.294028|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|851593|  130|
|06:36:16.294045|   10.0.0.1|     10.0.0.2|   55518|    5001|       0|      58|852345|  130|
+---------------+-----------+-------------+--------+--------+--------+--------+------+-----+
only showing top 20 rows

我必须在我的 dataframe 中添加功能和标签以预测计数值.但是,当我运行代码时,我将看到以下错误:

I have to add features and label to my dataframe to predict the count value. However as I ran the code I will see the below error:

Failed to execute user defined function(anonfun$15: (int, int, string, string, int, int, int, int, int) => vector)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)

我还 cast(IntegerType)我所有的功能,但是再次发生错误.这是我的代码:

I also cast(IntegerType) all my features but again the error occurs. Here is my code:

val Frist_Dataframe = sqlContext.createDataFrame(Row_Dstream_Train, customSchema)

       val toVec9 = udf[Vector, Int, Int, String, String, Int, Int, Int, Int, Int] { (a, b, c, d, e, f, g, h, i) =>
              val e3 = c match {
                case "10.0.0.1" => 1
                case "10.0.0.2" => 2
                case "10.0.0.3" => 3
              }

              val e4 = d match {
                case "10.0.0.1" => 1
                case "10.0.0.2" => 2
                case "10.0.0.3" => 3
              }
              Vectors.dense(a, b, e3, e4, e, f, g, h, i)
            }

            val final_df = Dataframe.withColumn(
              "features",
              toVec9(
                // casting into Timestamp to parse the string, and then into Int
                $"time_stamp_0".cast(TimestampType).cast(IntegerType),
                $"count".cast(IntegerType),
                $"sender_ip_1",
                $"receiver_ip_2",
                $"s_port_3".cast(IntegerType),
                $"r_port_4".cast(IntegerType),
                $"acknum_5".cast(IntegerType),
                $"winnum_6".cast(IntegerType),
                $"len_7".cast(IntegerType)
              )
            ).withColumn("label", (Dataframe("count"))).select("features", "label")

final_df.show()

final_df.show()

val trainingTest = final_df.randomSplit(Array(0.8, 0.2))
val TrainingDF = trainingTest(0).toDF()
val TestingDF=trainingTest(1).toDF()
TrainingDF.show()
TestingDF.show()

我的依赖项还包括:

libraryDependencies ++= Seq(
  "co.theasi" %% "plotly" % "0.2.0",
  "org.apache.spark" %% "spark-core" % "2.1.1",
  "org.apache.spark" %% "spark-sql" % "2.1.1",
  "org.apache.spark" %% "spark-hive" % "2.1.1",
  "org.apache.spark" %% "spark-streaming" % "2.1.1",
  "org.apache.spark" %% "spark-mllib" % "2.1.1"
)

最有趣的一点是,如果我在代码的最后一部分将所有 cast(IntegerType)更改为 cast(TimestampType).cast(IntegerType),则错误消失,输出将如下所示:

The funnest point is that if I change all my cast(IntegerType) to cast(TimestampType).cast(IntegerType) in the last part of my code, the error disappear and the output will be something like this:

+--------+-----+
|features|label|
+--------+-----+
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
|    null|  130|
+--------+-----+

更新:应用@Ramesh Maharjan解决方案后,我的数据框的结果运行良好,但是,每当我尝试将final_df数据框拆分为训练和测试结果时,结果如下所示,但我仍然可以存在空行的同样问题.

UPDATE: After applying @Ramesh Maharjan solution the result of my dataframe works well but, whenever I try to splitting my final_df dataframe into training and testing the result is something like below and I still have the same problem of having null rows.

+--------------------+-----+
|            features|label|
+--------------------+-----+
|                null|  130|
|                null|  130|
|                null|  130|
|                null|  130|
|                null|  130|
|                null|  130|
|                null|  130|
|                null|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
|[1.497587776E9,13...|  130|
+--------------------+-----+

你能帮我吗?

推荐答案

我没有在问题代码中生成 count列.除了 count 列之外,@ Shankar的答案还应为您提供所需的结果.

I didn't see count column being generated in your question code. Apart from count column @Shankar's answer should get you the result you want.

以下错误是由于对 udf 函数的错误定义,@ Shankar已在其答案中对其进行了纠正.

Following error was due to wrong definition of udf function which @Shankar had it corrected in his answer.

Failed to execute user defined function(anonfun$15: (int, int, string, string, int, int, int, int, int) => vector)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)

以下错误是由于 spark-mllib库 spark-core库 spark-sql库版本不匹配.它们都应该是相同的版本.

Following error is due to version mismatch of spark-mllib library with spark-core library and spark-sql library. They all should be of same version.

error: Caused by: org.apache.spark.SparkException: Failed to execute user defined function(anonfun$15: (int, int, string, string, int, int, int, int, int) => vector) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$Gen‌​eratedIterator.proce‌​ssNext(Unknown Source) 

我希望解释清楚,希望您的问题很快得到解决.

I hope the explanation is clear and hope to see your problem get solved soon.

已编辑

您还没有像@Shankar所建议的那样更改 udf 函数.也添加 .trim ,因为我可以看到一些空格

You haven't still changed the udf function as @Shankar had suggested. Add .trim too as I can see some spaces

val toVec9 = udf ((a: Int, b: Int, c: String, d: String, e: Int, f: Int, g: Int, h: Int, i: Int) =>
  {
  val e3 = c.trim match {
    case "10.0.0.1" => 1
    case "10.0.0.2" => 2
    case "10.0.0.3" => 3
  }
  val e4 = d.trim match {
    case "10.0.0.1" => 1
    case "10.0.0.2" => 2
    case "10.0.0.3" => 3
  }
  Vectors.dense(a, b, e3, e4, e, f, g, h, i)
})

在查看依赖项时,您使用的是 %% ,它告诉 sbt 下载与 scala打包在一起的 dependencies .代码>版本在您的系统中.那应该没问题,但是由于您仍然遇到错误,因此我想将 dependencies 更改为

And looking at your dependencies, you are using %% which tells sbt to download the dependencies packaged with scala version in your system. That should be fine but since you are still getting errors, I would like to change the dependencies as

libraryDependencies ++= Seq(
  "co.theasi" %% "plotly" % "0.2.0",
  "org.apache.spark" % "spark-core_2.11" % "2.1.1",
  "org.apache.spark" % "spark-sql_2.11" % "2.1.1",
  "org.apache.spark" %% "spark-hive" % "2.1.1",
  "org.apache.spark" % "spark-streaming_2.11" % "2.1.1",
  "org.apache.spark" % "spark-mllib_2.11" % "2.1.1"

)

这篇关于无法使用Scala在Apache Spark中执行用户定义的函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆