java.lang.ClassNotFoundException:运行 Scala MongoDB 连接器时出现 org.apache.spark.sql.DataFrame 错误 [英] java.lang.ClassNotFoundException: org.apache.spark.sql.DataFrame error when running Scala MongoDB connector

查看:20
本文介绍了java.lang.ClassNotFoundException:运行 Scala MongoDB 连接器时出现 org.apache.spark.sql.DataFrame 错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 SBT 运行 Scala 示例以从 MongoDB 读取数据.每当我尝试访问从 Mongo 读取到 RDD 的数据时,我都会收到此错误.

I am trying to run a Scala example with SBT to read data from MongoDB. I am getting this error whenever I try to access the data read from Mongo into the RDD.

Exception in thread "dag-scheduler-event-loop" java.lang.NoClassDefFoundError: org/apache/spark/sql/DataFrame
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.getDeclaredMethod(Class.java:2128)
at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1431)
at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:72)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:494)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:468)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

我已经明确导入了 Dataframe,即使我的代码中没有使用它.有人可以帮忙解决这个问题吗?

I have imported the Dataframe explicitly, even though it is not used in my code. Can anyone help with this issue?

我的代码:

package stream

import org.apache.spark._
import org.apache.spark.SparkContext._
import com.mongodb.spark._
import com.mongodb.spark.config._
import com.mongodb.spark.rdd.MongoRDD
import org.bson.Document
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.DataFrame

object SpaceWalk {

def main(args: Array[String]) {

    val sparkConf = new SparkConf().setAppName("SpaceWalk")
    .setMaster("local[*]")
    .set("spark.mongodb.input.uri", "mongodb://127.0.0.1/nasa.eva")
    .set("spark.mongodb.output.uri", "mongodb://127.0.0.1/nasa.astronautTotals")

    val sc = new SparkContext(sparkConf)
    val rdd = sc.loadFromMongoDB()


    def  breakoutCrew (  document: Document  ): List[(String,Int)]  = {
    println("INPUT"+document.get( "Duration").asInstanceOf[String])
      var minutes = 0;
      val timeString = document.get( "Duration").asInstanceOf[String]
      if( timeString != null && !timeString.isEmpty ) {
        val time =  document.get( "Duration").asInstanceOf[String].split( ":" )
        minutes = time(0).toInt * 60 + time(1).toInt
      }

      import scala.util.matching.Regex
      val pattern = new Regex("(\\w+\\s\\w+)")
      val names =  pattern findAllIn document.get( "Crew" ).asInstanceOf[String]
      var tuples : List[(String,Int)] = List()
      for ( name <- names ) { tuples = tuples :+ (( name, minutes ) ) }

      return tuples
    }

    val logs = rdd.flatMap( breakoutCrew ).reduceByKey( (m1: Int, m2: Int) => ( m1 + m2 ) )

    //logs.foreach(println)

    def mapToDocument( tuple: (String, Int )  ): Document = {
      val doc = new Document();
      doc.put( "name", tuple._1 )
      doc.put( "minutes", tuple._2 )

      return doc
    }

    val writeConf = WriteConfig(sc)
    val writeConfig = WriteConfig(Map("collection" -> "astronautTotals", "writeConcern.w" -> "majority", "db" -> "nasa"), Some(writeConf))

    logs.map( mapToDocument ).saveToMongoDB( writeConfig )

    import org.apache.spark.sql.SQLContext
    import com.mongodb.spark.sql._
    import org.apache.spark.sql.DataFrame

    // load the first dataframe "EVAs"
    val sqlContext = new SQLContext(sc);
    import sqlContext.implicits._
    val evadf = sqlContext.read.mongo()
    evadf.printSchema()
    evadf.registerTempTable("evas")

    // load the 2nd dataframe "astronautTotals"

    val astronautDF = sqlContext.read.option("collection", "astronautTotals").mongo[astronautTotal]()
    astronautDF.printSchema()
    astronautDF.registerTempTable("astronautTotals")

    sqlContext.sql("SELECT astronautTotals.name, astronautTotals.minutes FROM astronautTotals"  ).show()


    sqlContext.sql("SELECT astronautTotals.name, astronautTotals.minutes, evas.Vehicle, evas.Duration FROM " +
      "astronautTotals JOIN evas ON astronautTotals.name LIKE evas.Crew"  ).show()
}
}
 case class astronautTotal ( name: String, minutes: Integer )

这是我的 sbt 文件 -

This is my sbt file -

name := "Project"
version := "1.0"    
scalaVersion := "2.11.7"    
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.0.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0"
//libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" % "1.2.1"
libraryDependencies += "org.apache.bahir" %% "spark-streaming-twitter" % "2.0.0"
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "0.1"  

addCommandAlias("c1", "run-main stream.SaveTweets")
addCommandAlias("c2", "run-main stream.SpaceWalk")

outputStrategy := Some(StdoutOutput)
//outputStrategy := Some(LoggedOutput(log: Logger))    
fork in run := true

推荐答案

此错误消息是因为您使用的库不兼容,该库仅支持 Spark 1.x.您应该改用 mongo-spark-connector 2.0.0+.请参阅:https://docs.mongodb.com/spark-connector/v2.0/

This error message is because you are using an incompatible library that only supports Spark 1.x. You should use mongo-spark-connector 2.0.0+ instead. See: https://docs.mongodb.com/spark-connector/v2.0/

这篇关于java.lang.ClassNotFoundException:运行 Scala MongoDB 连接器时出现 org.apache.spark.sql.DataFrame 错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆