检测到 Spark-Kafka 无效依赖项 [英] Spark-Kafka invalid dependency detected

查看:28
本文介绍了检测到 Spark-Kafka 无效依赖项的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个基本的 Spark - Kafka 代码,我尝试运行以下代码:

import org.apache.spark.SparkConf导入 org.apache.spark.streaming.{秒,StreamingContext}导入 org.apache.spark.storage.StorageLevel导入 java.util.regex.Pattern导入 java.util.regex.Matcher导入 org.apache.spark.streaming.kafka._导入 kafka.serializer.StringDecoder导入实用程序._对象字数{def main(args: Array[String]): Unit = {val ssc = new StreamingContext("local[*]", "KafkaExample", Seconds(1))设置日志()//构造一个正则表达式(regex)从原始 Apache 日志行中提取字段val 模式 = apacheLogPattern()//主机名:Kafka 代理的端口,而不是 Zookeeperval kafkaParams = Map("metadata.broker.list" -> "localhost:9092")//您想从 Kafka 收听的主题列表val 主题 = List("testLogs").toSet//创建我们的 Kafka 流,它将包含 (topic,message) 对.我们贴一个//最后映射(_._2) 以便只获取包含个人的消息//数据行.val 行 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, 主题).map(_._2)//从每个日志行中提取请求字段val requests = lines.map(x => {val matcher:Matcher = pattern.matcher(x); if (matcher.matches()) matcher.group(5)})//从请求中提取 URLval urls = requests.map(x => {val arr = x.toString().split(" "); if (arr.size == 3) arr(1) else "[error]"})//在每秒滑动 5 分钟的窗口中按 URL 减少val urlCounts = urls.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(300), Seconds(1))//排序并打印结果val sortedResults = urlCounts.transform(rdd => rdd.sortBy(x => x._2, false))sortedResults.print()//启动它ssc.checkpoint("/home/")ssc.start()ssc.awaitTermination()}}

我正在使用 IntelliJ IDE,并使用 sbt 创建 Scala 项目.build.sbt 文件的详细信息如下:

name := "样品"版本:=1.0"组织 := "com.sundogsoftware"斯卡拉版本:=2.11.8"libraryDependencies ++= Seq(org.apache.spark"%%spark-core"%2.2.0"%提供","org.apache.spark" %% "spark-streaming" % "1.4.1","org.apache.spark" %% "spark-streaming-kafka" % "1.4.1",org.apache.hadoop"%hadoop-hdfs"%2.6.0")

但是,当我尝试构建代码时,它会产生以下错误:

错误:scalac:加载类文件StreamingContext.class"时检测到缺少或无效的依赖项.无法访问包 org.apache.spark 中的类型日志记录,因为它(或其依赖项)丢失了.检查您的构建定义缺少或冲突的依赖项.(使用 -Ylog-classpath 重新运行以查看有问题的类路径.)如果 'StreamingContext.class' 是针对不兼容的 org.apache.spark 版本编译的,则完全重建可能会有所帮助.

错误:scalac:加载类文件DStream.class"时检测到缺少或无效的依赖项.无法访问包 org.apache.spark 中的类型日志记录,因为它(或其依赖项)丢失了.检查您的构建定义缺少或冲突的依赖项.(使用 -Ylog-classpath 重新运行以查看有问题的类路径.)如果 'DStream.class' 是针对不兼容的 org.apache.spark 版本编译的,则完全重建可能会有所帮助.

解决方案

当一起使用不同的 Spark 库时,所有库的版本应该始终匹配.

此外,您使用的 kafka 版本也很重要,例如:spark-streaming-kafka-0-10_2.11

<预><代码>...斯卡拉版本:=2.11.8"val sparkVersion = "2.2.0"libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % sparkVersion % "provided","org.apache.spark" %% "spark-streaming" % sparkVersion,"org.apache.spark" %% "spark-streaming-kafka-0-10_2.11" % sparkVersion,org.apache.hadoop"%hadoop-hdfs"%2.6.0"

)

如果您需要检查应该使用的确切依赖项,这是一个有用的站点:https://search.maven.org/

I have a basic Spark - Kafka code, I try to run following code:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel

import java.util.regex.Pattern
import java.util.regex.Matcher
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
import Utilities._
object WordCount {
  def main(args: Array[String]): Unit = {

    val ssc = new StreamingContext("local[*]", "KafkaExample", Seconds(1))

    setupLogging()

    // Construct a regular expression (regex) to extract fields from raw Apache log lines
    val pattern = apacheLogPattern()

    // hostname:port for Kafka brokers, not Zookeeper
    val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
    // List of topics you want to listen for from Kafka
    val topics = List("testLogs").toSet
    // Create our Kafka stream, which will contain (topic,message) pairs. We tack a
    // map(_._2) at the end in order to only get the messages, which contain individual
    // lines of data.
    val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics).map(_._2)

    // Extract the request field from each log line
    val requests = lines.map(x => {val matcher:Matcher = pattern.matcher(x); if (matcher.matches()) matcher.group(5)})

    // Extract the URL from the request
    val urls = requests.map(x => {val arr = x.toString().split(" "); if (arr.size == 3) arr(1) else "[error]"})

    // Reduce by URL over a 5-minute window sliding every second
    val urlCounts = urls.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(300), Seconds(1))

    // Sort and print the results
    val sortedResults = urlCounts.transform(rdd => rdd.sortBy(x => x._2, false))
    sortedResults.print()

    // Kick it off
    ssc.checkpoint("/home/")
    ssc.start()
    ssc.awaitTermination()

  }


}

I am using IntelliJ IDE, and create scala project by using sbt. Details of build.sbt file is as follow:

name := "Sample"

version := "1.0"

organization := "com.sundogsoftware"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.2.0" % "provided",
  "org.apache.spark" %% "spark-streaming" % "1.4.1",
  "org.apache.spark" %% "spark-streaming-kafka" % "1.4.1",
  "org.apache.hadoop" % "hadoop-hdfs" % "2.6.0"

)

However, when I try to build the code, it creates following error:

Error:scalac: missing or invalid dependency detected while loading class file 'StreamingContext.class'. Could not access type Logging in package org.apache.spark, because it (or its dependencies) are missing. Check your build definition for missing or conflicting dependencies. (Re-run with -Ylog-classpath to see the problematic classpath.) A full rebuild may help if 'StreamingContext.class' was compiled against an incompatible version of org.apache.spark.

Error:scalac: missing or invalid dependency detected while loading class file 'DStream.class'. Could not access type Logging in package org.apache.spark, because it (or its dependencies) are missing. Check your build definition for missing or conflicting dependencies. (Re-run with -Ylog-classpath to see the problematic classpath.) A full rebuild may help if 'DStream.class' was compiled against an incompatible version of org.apache.spark.

解决方案

When using different Spark libraries together the versions of all libs should always match.

Also, the version of kafka you use matters also, so should be for example: spark-streaming-kafka-0-10_2.11

...
scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
  "org.apache.spark" %% "spark-streaming" % sparkVersion,
  "org.apache.spark" %% "spark-streaming-kafka-0-10_2.11" % sparkVersion,
  "org.apache.hadoop" % "hadoop-hdfs" % "2.6.0"

)

This is a useful site if you need to check the exact dependencies you should use: https://search.maven.org/

这篇关于检测到 Spark-Kafka 无效依赖项的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆