群组成员支持的协议与现有成员的协议不兼容 [英] The group member's supported protocols are incompatible with those of existing members
问题描述
我遇到了与 Kafka 相关的问题.
I'm facing an issue related to Kafka.
我的当前服务 (Producer
) 将消息发送到 Kafka 主题 (events
).该服务使用的是用 Java 编写的 kafka_2.12 v1.0.0
.
I'm having my current service (Producer
) that sends the message to a Kafka topic (events
). The service is using kafka_2.12 v1.0.0
, written in Java.
我正在尝试将它与 spark-streaming
的示例项目集成为 Consumer
服务(此处 使用 kafka_2.11 v0.10.0,用 Scala 编写)
I'm trying to integrate it with the sample project of spark-streaming
as a Consumer
service (here using kafka_2.11 v0.10.0, written in Scala)
消息从Producer
成功发送到Kafka主题.但是,我总是收到以下错误堆栈:
The message is sent successfully from Producer
to the Kafka topic. However, I always receive the error stack below:
Exception in thread "main" org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member's supported protocols are incompatible with those of existing members.
at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:577)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:571) at com.jj.streaming.ItemApp$.delayedEndpoint$com$jj$streaming$ItemApp$1(ItemApp.scala:72)
at com.jj.streaming.ItemApp$delayedInit$body.apply(ItemApp.scala:12)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.App$class.main(App.scala:76)
at com.jj.streaming.ItemApp$.main(ItemApp.scala:12)
at com.jj.streaming.ItemApp.main(ItemApp.scala)
我不知道根本原因.我该如何解决这个问题?
I don't know the root cause. How can I fix this?
推荐答案
当我尝试将消费者添加到使用与之前不同的分区分配策略的集群时,会在我的配置中发生这种情况.
This happens in my configuration when I attempt to add a consumer to the cluster that is using a different partition assignment strategy from the previous ones.
例如:
partition.assignment.strategy=org.apache.kafka.clients.consumer.RandomAccessAssignor
partition.assignment.strategy=org.apache.kafka.clients.consumer.RandomAccessAssignor
混合或默认为:
partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor
partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor
这篇关于群组成员支持的协议与现有成员的协议不兼容的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!