Spark 数据集:示例:无法生成编码器问题 [英] Spark Dataset : Example : Unable to generate an encoder issue

查看:24
本文介绍了Spark 数据集:示例:无法生成编码器问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

新来的火花世界并尝试我在网上找到的用 Scala 编写的数据集示例

New to spark world and trying a dataset example written in scala that I found online

通过 SBT 运行它时,我不断收到以下错误

On running it through SBT , i keep on getting the following error

org.apache.spark.sql.AnalysisException:无法为内部类生成编码器

知道我在俯瞰什么

也可以随意指出编写相同数据集示例的更好方法

Also feel free to point out better way of writing the same dataset example

谢谢

> sbt>  runMain DatasetExample

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/10/25 01:06:39 INFO Remoting: Starting remoting
16/10/25 01:06:46 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.150.130:50555]
[error] (run-main-6) org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class `DatasetExample$Student` without access to the scope that this class was defined in. Try moving this class out of its parent class.;
org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class `DatasetExample$Student` without access to the scope that this class was defined in. Try moving this class out of its parent class.;
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$3.applyOrElse(ExpressionEncoder.scala:306)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$3.applyOrElse(ExpressionEncoder.scala:302)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:259)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:258)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:249)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolve(ExpressionEncoder.scala:302)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:79)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:90)
at org.apache.spark.sql.DataFrame.as(DataFrame.scala:209)
at DatasetExample$.main(DatasetExample.scala:45)
at DatasetExample.main(DatasetExample.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
[trace] Stack trace suppressed: run last sparkExamples/compile:runMain for the full output.
java.lang.RuntimeException: Nonzero exit code: 1
at scala.sys.package$.error(package.scala:27)
[trace] Stack trace suppressed: run last sparkExamples/compile:runMain for the full output.
[error] (sparkExamples/compile:runMain) Nonzero exit code: 1
[error] Total time: 127 s, completed Oct 25, 2016 1:08:09 AM

代码:

import org.apache.spark._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.functions._

object DatasetExample  {
   // Create data sets 
   case class Student(name: String, dept: String, age:Long )
   case class Department(abbrevName: String, fullName: String)

   org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this) // Not sure what exactly is the purpose

   def main(args: Array[String]) {
      Logger.getLogger("org").setLevel(Level.OFF)
      Logger.getLogger("akka").setLevel(Level.OFF)
      // initialise spark context
      val conf = new SparkConf().setAppName("SetsExamples").setMaster("local")
      val sc = new SparkContext(conf)
      val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

      import sqlcontext.implicits._   // Not sure what exactly is the purpose

      // Read JSON objects into a Dataset[Student].
      val students = sqlcontext.read.json("student.json").as[Student]
      students.show()

      // Select two columns and filter on one column.
      // Each argument of "select" must be a "TypedColumn".
      students.select($"name".as[String], $"dept".as[String]).
                   filter(_._2 == "Math").  // Filter on _2, the second selected column
                   collect()

      // Group by department and count each group.
      students.groupBy(_.dept).count().collect()

      // Group and aggregate in each group.
      students.groupBy(_.dept).
                  agg(avg($"age").as[Double]).
                  collect()

      // Initialize a Seq and convert to a Dataset.
      val depts = Seq(Department("CS", "Computer Science"), Department("Math", "Mathematics")).toDS()

      // Show the contents of the Dataset.
      depts.show()

      // Join two datasets with "joinWith".
      val joined = students.joinWith(depts, $"dept" === $"abbrevName")

      // Show the contents of the joined Dataset.
      // Note that the original objects are nested into tuples under the _1 and _2 columns.
      joined.show()

      // terminate spark context
      sc.stop()

      }        
}

JSON 文件(student.json):

JSON file ( student.json) :

{"id" : "1201", "name" : "Kris", "age" : "25"}
{"id" : "1202", "name" : "John", "age" : "28"}
{"id" : "1203", "name" : "Chet", "age" : "39"}
{"id" : "1204", "name" : "Mark", "age" : "23"}
{"id" : "1205", "name" : "Vic", "age" : "23"}

推荐答案

这一行是导致问题的原因:

This line is what is causing the problem :

org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)

这意味着您要向此上下文添加一个新的外部作用域,可在反序列化期间实例化 内部类 时使用.

This means that you are adding a new outer scope to this context that can be used when instantiating an inner class during deserialization.

在 Spark REPL 中定义案例类时创建内部类,并且注册定义此类的外部作用域允许我们在 Spark 执行器上创建新实例.

Inner classes are created when a case class is defined in the Spark REPL and registering the outer scope that this class was defined in allows us to create new instances on the spark executors.

在正常使用中(您的情况),您不需要调用此函数.

In normal use (your case), you shouldn't need to call this function.

您还需要将案例类移到 DatasetExample 对象之外.

You'll also need to move your case classes outside of the DatasetExample object.

注意:

import sqlContext.implicits._ 是scala 特定的调用,用于将常见的scala RDD 对象转换为DataFrames 的隐式方法.

import sqlContext.implicits._ is a scala-specific call for implicit methods available for converting common scala RDD objects into DataFrames.

此处了解更多信息.

这篇关于Spark 数据集:示例:无法生成编码器问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆