检测到Spark-Kafka无效依赖关系 [英] Spark-Kafka invalid dependency detected

查看:187
本文介绍了检测到Spark-Kafka无效依赖关系的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个基本的Spark-Kafka代码,我尝试运行以下代码:

I have a basic Spark - Kafka code, I try to run following code:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel

import java.util.regex.Pattern
import java.util.regex.Matcher
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
import Utilities._
object WordCount {
  def main(args: Array[String]): Unit = {

    val ssc = new StreamingContext("local[*]", "KafkaExample", Seconds(1))

    setupLogging()

    // Construct a regular expression (regex) to extract fields from raw Apache log lines
    val pattern = apacheLogPattern()

    // hostname:port for Kafka brokers, not Zookeeper
    val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
    // List of topics you want to listen for from Kafka
    val topics = List("testLogs").toSet
    // Create our Kafka stream, which will contain (topic,message) pairs. We tack a
    // map(_._2) at the end in order to only get the messages, which contain individual
    // lines of data.
    val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics).map(_._2)

    // Extract the request field from each log line
    val requests = lines.map(x => {val matcher:Matcher = pattern.matcher(x); if (matcher.matches()) matcher.group(5)})

    // Extract the URL from the request
    val urls = requests.map(x => {val arr = x.toString().split(" "); if (arr.size == 3) arr(1) else "[error]"})

    // Reduce by URL over a 5-minute window sliding every second
    val urlCounts = urls.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(300), Seconds(1))

    // Sort and print the results
    val sortedResults = urlCounts.transform(rdd => rdd.sortBy(x => x._2, false))
    sortedResults.print()

    // Kick it off
    ssc.checkpoint("/home/")
    ssc.start()
    ssc.awaitTermination()

  }


}

我正在使用IntelliJ IDE,并通过使用sbt创建scala项目. build.sbt文件的详细信息如下:

I am using IntelliJ IDE, and create scala project by using sbt. Details of build.sbt file is as follow:

name := "Sample"

version := "1.0"

organization := "com.sundogsoftware"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.2.0" % "provided",
  "org.apache.spark" %% "spark-streaming" % "1.4.1",
  "org.apache.spark" %% "spark-streaming-kafka" % "1.4.1",
  "org.apache.hadoop" % "hadoop-hdfs" % "2.6.0"

)

但是,当我尝试构建代码时,它会产生以下错误:

However, when I try to build the code, it creates following error:

错误:scalac:在加载类文件'StreamingContext.class'时检测到缺少或无效的依赖项. 无法访问org.apache.spark包中的Logging类型, 因为它(或其依赖项)丢失了.检查您的构建定义是否 缺少或冲突的依赖项. (使用-Ylog-classpath重新运行以查看有问题的类路径.) 如果针对不兼容版本的org.apache.spark编译了StreamingContext.class,则完全重建可能会有所帮助.

Error:scalac: missing or invalid dependency detected while loading class file 'StreamingContext.class'. Could not access type Logging in package org.apache.spark, because it (or its dependencies) are missing. Check your build definition for missing or conflicting dependencies. (Re-run with -Ylog-classpath to see the problematic classpath.) A full rebuild may help if 'StreamingContext.class' was compiled against an incompatible version of org.apache.spark.

错误:scalac:在加载类文件'DStream.class'时检测到缺少或无效的依赖项. 无法访问org.apache.spark包中的Logging类型, 因为它(或其依赖项)丢失了.检查您的构建定义是否 缺少或冲突的依赖项. (使用-Ylog-classpath重新运行以查看有问题的类路径.) 如果针对不兼容版本的org.apache.spark编译了"DStream.class",则完全重建可能会有所帮助.

Error:scalac: missing or invalid dependency detected while loading class file 'DStream.class'. Could not access type Logging in package org.apache.spark, because it (or its dependencies) are missing. Check your build definition for missing or conflicting dependencies. (Re-run with -Ylog-classpath to see the problematic classpath.) A full rebuild may help if 'DStream.class' was compiled against an incompatible version of org.apache.spark.

推荐答案

一起使用不同的Spark库时,所有库的版本应始终匹配.

When using different Spark libraries together the versions of all libs should always match.

另外,您使用的kafka的版本也很重要,因此应例如:spark-streaming-kafka-0-10_2.11

Also, the version of kafka you use matters also, so should be for example: spark-streaming-kafka-0-10_2.11

...
scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
  "org.apache.spark" %% "spark-streaming" % sparkVersion,
  "org.apache.spark" %% "spark-streaming-kafka-0-10_2.11" % sparkVersion,
  "org.apache.hadoop" % "hadoop-hdfs" % "2.6.0"

)

如果您需要检查应使用的确切依赖项,则这是一个有用的站点: https://search.maven.org/

This is a useful site if you need to check the exact dependencies you should use: https://search.maven.org/

这篇关于检测到Spark-Kafka无效依赖关系的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆