java.util.ConcurrentModificationException:KafkaConsumer对于多线程访问不安全 [英] java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

查看:254
本文介绍了java.util.ConcurrentModificationException:KafkaConsumer对于多线程访问不安全的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Scala Spark Streaming应用程序,该应用程序从3个不同的Kafka producers接收来自同一主题的数据.

Spark流应用程序在主机为0.0.0.179的计算机上,Kafka服务器在主机为0.0.0.178的计算机上,Kafka producers在计算机0.0.0.1800.0.0.1810.0.0.182上.

当我尝试运行Spark Streaming应用程序时出现错误

线程"main"中的异常org.apache.spark.SparkException:作业 由于阶段失败而中止:阶段19.0中的任务0失败了1次, 最近一次失败:在阶段19.0(TID 19,localhost)中丢失了任务0.0: java.util.ConcurrentModificationException:KafkaConsumer不安全 用于多线程访问 org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1625) 在 org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1198) 在 org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95) 在 org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69) 在 org.apache.spark.streaming.kafka010.KafkaRDD $ KafkaRDDIterator.next(KafkaRDD.scala:228) 在 org.apache.spark.streaming.kafka010.KafkaRDD $ KafkaRDDIterator.next(KafkaRDD.scala:194) 在scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:409)处 scala.collection.Iterator $$ anon $ 11.next(Iterator.scala:409)在 org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ saveAsHadoopDataset $ 1 $$ anonfun $ 13 $$ anonfun $ apply $ 7.apply $ mcV $ sp(PairRDDFunctions.scala:1204) 在 org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ saveAsHadoopDataset $ 1 $$ anonfun $ 13 $$ anonfun $ apply $ 7.apply(PairRDDFunctions.scala:1203) 在 org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ saveAsHadoopDataset $ 1 $$ anonfun $ 13 $$ anonfun $ apply $ 7.apply(PairRDDFunctions.scala:1203) 在 org.apache.spark.util.Utils $ .tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325) 在 org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ saveAsHadoopDataset $ 1 $$ anonfun $ 13.apply(PairRDDFunctions.scala:1211) 在 org.apache.spark.rdd.PairRDDFunctions $$ anonfun $ saveAsHadoopDataset $ 1 $$ anonfun $ 13.apply(PairRDDFunctions.scala:1190) 在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 在org.apache.spark.scheduler.Task.run(Task.scala:85)在 org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:274) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617) 在java.lang.Thread.run(Thread.java:748)

现在,我阅读了数千篇不同的文章,但似乎没人能找到解决此问题的方法.

如何在我的应用程序中处理此问题?我是否必须修改Kakfa上的某些参数(目前num.partition参数设置为1)?

以下是我的应用程序的代码:

// Create the context with a 5 second batch size
val sparkConf = new SparkConf().setAppName("SparkScript").set("spark.driver.allowMultipleContexts", "true").set("spark.streaming.concurrentJobs", "3").setMaster("local[4]")
val sc = new SparkContext(sparkConf)

val ssc = new StreamingContext(sc, Seconds(3))

case class Thema(name: String, metadata: String)
case class Tempo(unit: String, count: Int, metadata: String)
case class Spatio(unit: String, metadata: String)
case class Stt(spatial: Spatio, temporal: Tempo, thematic: Thema)
case class Location(latitude: Double, longitude: Double, name: String)

case class Datas1(location : Location, timestamp : String, windspeed : Double, direction: String, strenght : String)
case class Sensors1(sensor_name: String, start_date: String, end_date: String, data1: Datas1, stt: Stt)    


val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "0.0.0.178:9092",
    "key.deserializer" -> classOf[StringDeserializer].getCanonicalName,
    "value.deserializer" -> classOf[StringDeserializer].getCanonicalName,
    "group.id" -> "test_luca",
    "auto.offset.reset" -> "earliest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics1 = Array("topics1")

  val s1 = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics1, kafkaParams)).map(record => {
    implicit val formats = DefaultFormats
    parse(record.value).extract[Sensors1]
  } 
  )      
  s1.print()
  s1.saveAsTextFiles("results/", "")
ssc.start()
ssc.awaitTermination()

谢谢

解决方案

您的问题在这里:

s1.print()
s1.saveAsTextFiles("results/", "")

由于Spark创建了流图,因此您在此处定义了两个流:

Read from Kafka -> Print to console
Read from Kafka -> Save to text file

Spark将尝试同时运行这两个图,因为它们彼此独立.由于Kafka使用了缓存的使用者方法,因此它实际上试图在两个流执行过程中使用相同的使用者.

您可以做的是在运行两个查询之前缓存DStream:

val dataFromKafka = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics1, kafkaParams)).map(/* stuff */)

val cachedStream = dataFromKafka.cache()
cachedStream.print()
cachedStream.saveAsTextFiles("results/", "")

I have a Scala Spark Streaming application that receives data from the same topic from 3 different Kafka producers.

The Spark streaming application is on machine with host 0.0.0.179, the Kafka server is on machine with host 0.0.0.178, the Kafka producers are on machines, 0.0.0.180, 0.0.0.181, 0.0.0.182.

When I try to run the Spark Streaming application got below error

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 19.0 failed 1 times, most recent failure: Lost task 0.0 in stage 19.0 (TID 19, localhost): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1625) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1198) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:228) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:194) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply$mcV$sp(PairRDDFunctions.scala:1204) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1203) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1203) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1211) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1190) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)

Now I read thousand of different posts but no one seems to be able to find a solution at this issue.

How can I handle this on my application? Do I have to modify some parameters on Kakfa (at the moment the num.partition parameter is set to 1)?

Following is the code of my application :

// Create the context with a 5 second batch size
val sparkConf = new SparkConf().setAppName("SparkScript").set("spark.driver.allowMultipleContexts", "true").set("spark.streaming.concurrentJobs", "3").setMaster("local[4]")
val sc = new SparkContext(sparkConf)

val ssc = new StreamingContext(sc, Seconds(3))

case class Thema(name: String, metadata: String)
case class Tempo(unit: String, count: Int, metadata: String)
case class Spatio(unit: String, metadata: String)
case class Stt(spatial: Spatio, temporal: Tempo, thematic: Thema)
case class Location(latitude: Double, longitude: Double, name: String)

case class Datas1(location : Location, timestamp : String, windspeed : Double, direction: String, strenght : String)
case class Sensors1(sensor_name: String, start_date: String, end_date: String, data1: Datas1, stt: Stt)    


val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "0.0.0.178:9092",
    "key.deserializer" -> classOf[StringDeserializer].getCanonicalName,
    "value.deserializer" -> classOf[StringDeserializer].getCanonicalName,
    "group.id" -> "test_luca",
    "auto.offset.reset" -> "earliest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics1 = Array("topics1")

  val s1 = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics1, kafkaParams)).map(record => {
    implicit val formats = DefaultFormats
    parse(record.value).extract[Sensors1]
  } 
  )      
  s1.print()
  s1.saveAsTextFiles("results/", "")
ssc.start()
ssc.awaitTermination()

Thank you

解决方案

Your problem is here:

s1.print()
s1.saveAsTextFiles("results/", "")

Since Spark creates a graph of flows, and you define two flows here:

Read from Kafka -> Print to console
Read from Kafka -> Save to text file

Spark will attempt to concurrently run both of these graphs, since they are independent of each other. Since Kafka uses a cached consumer approach, it is effectively trying to use the same consumer for both stream executions.

What you can do is cache the DStream before running the two queries:

val dataFromKafka = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics1, kafkaParams)).map(/* stuff */)

val cachedStream = dataFromKafka.cache()
cachedStream.print()
cachedStream.saveAsTextFiles("results/", "")

这篇关于java.util.ConcurrentModificationException:KafkaConsumer对于多线程访问不安全的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆