2 个具有相同消费者组 ID 的火花流作业 [英] 2 spark stream job with same consumer group id

查看:21
本文介绍了2 个具有相同消费者组 ID 的火花流作业的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试针对消费者群体进行实验

这是我的代码片段

public final class App {私有静态最终整数间隔 = 5000;public static void main(String[] args) 抛出异常 {映射<字符串,对象>kafkaParams = new HashMap<>();kafkaParams.put("bootstrap.servers", "xxx:9092");kafkaParams.put("key.deserializer", StringDeserializer.class);kafkaParams.put("value.deserializer", StringDeserializer.class);kafkaParams.put("auto.offset.reset", "最早");kafkaParams.put("enable.auto.commit", true);kafkaParams.put("auto.commit.interval.ms","1000");kafkaParams.put("security.protocol","SASL_PLAINTEXT");kafkaParams.put("sasl.kerberos.service.name","kafka");kafkaParams.put("重试","3");kafkaParams.put(GROUP_ID_CONFIG,"mygroup");kafkaParams.put("request.timeout.ms","210000");kafkaParams.put("session.timeout.ms","180000");kafkaParams.put("heartbeat.interval.ms","3000");集合<字符串>主题 = Arrays.asList("venkat4");SparkConf conf = new SparkConf();JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(INTERVAL));final JavaInputDStream>流 =KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent(),ConsumerStrategies. Subscribe(topics, kafkaParams));流.mapToPair(new PairFunction, String, String>() {@覆盖public Tuple2调用(消费者记录<字符串,字符串>记录){return new Tuple2(record.key(), record.value());}}).打印();ssc.start();ssc.awaitTermination();}

}

当我同时运行两个 Spark 流作业时,它失败并显示错误

<块引用>

线程main"中的异常java.lang.IllegalStateException:分区venkat4-1没有当前分配在 org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)在 org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315)在 org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170)在 org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197)在 org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214)在 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)在 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)在 scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)在 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)在 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)在 org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)在 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)在 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)在 scala.Option.orElse(Option.scala:289)

根据这个

现在所有的分区都只被一个消费者消费.如果数据摄取率很高,消费者可能会以摄取的速度缓慢地消费数据.

在同一个consumergroup中增加更多consumer来消费一个topic的数据,提高消费率.Spark 流使用这种方法在 Kafka 分区和 Spark 分区之间实现 1:1 并行.Spark 会在内部处理.

如果您的消费者数量多于主题分区,它将处于空闲状态并且资源未得到充分利用.始终建议消费者应小于或等于分区数.

<块引用>

如果添加更多进程/线程,Kafka 将重新平衡.如果任何消费者或代理未能向 ZooKeeper 发送心跳,则可以通过 Kafka 集群重新配置 ZooKeeper.

每当任何代理失败或向现有主题添加新分区时,Kafka 都会重新平衡分区存储.这是 kafka 特定的如何在代理中跨分区平衡数据.

Spark 流在 Kafka 分区和 Spark 分区之间提供简单的 1:1 并行性.如果您没有使用 ConsumerStragies.Assign 提供任何分区详细信息,则使用给定主题的所有分区.

<块引用>

Kafka 将一个主题的分区分配给一个组中的消费者,所以每个分区都被组中的一个消费者消费.Kafka 保证一条消息只能被一个消费者读取在群里.

当您启动第二个 Spark 流作业时,另一个消费者尝试使用来自同一个消费者 groupid 的同一个分区.所以它会抛出错误.

val alertTopics = Array("testtopic")val kafkaParams = Map[String, Object](bootstrap.servers"->sparkJobConfig.kafkaBrokers,key.deserializer" ->classOf[StringDeserializer],value.deserializer" ->classOf[StringDeserializer],group.id"->sparkJobConfig.kafkaConsumerGroup,auto.offset.reset" ->最新的")val streamContext = new StreamingContext(sparkContext, Seconds(sparkJobConfig.streamBatchInterval.toLong))val streamData = KafkaUtils.createDirectStream(streamContext, PreferConsistent, Subscribe[String, String](alertTopics, kafkaParams))

如果要使用分区特定的 spark 作业,请使用以下代码.

val topicPartitionsList = List(new TopicPartition("topic",1))val alertReqStream1 = KafkaUtils.createDirectStream(streamContext, PreferConsistent, ConsumerStrategies.Assign(topicPartitionsList, kafkaParams))

https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html#consumerstrategies

<块引用>

消费者可以使用 samegroup.id 加入群组.

val topicPartitionsList = List(new TopicPartition("topic",3), new TopicPartition("topic",4))val alertReqStream2 = KafkaUtils.createDirectStream(streamContext, PreferConsistent, ConsumerStrategies.Assign(topicPartitionsList, kafkaParams))

再添加两个消费者就是添加到同一个 groupid 中.

请阅读 Spark-Kafka 集成指南.https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html

希望这会有所帮助.

I am trying to experiment on consumer groups

Here is my code snippet

public final class App {

private static final int INTERVAL = 5000;

public static void main(String[] args) throws Exception {

    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", "xxx:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("auto.offset.reset", "earliest");
    kafkaParams.put("enable.auto.commit", true);
    kafkaParams.put("auto.commit.interval.ms","1000");
    kafkaParams.put("security.protocol","SASL_PLAINTEXT");
    kafkaParams.put("sasl.kerberos.service.name","kafka");
    kafkaParams.put("retries","3");
    kafkaParams.put(GROUP_ID_CONFIG,"mygroup");
    kafkaParams.put("request.timeout.ms","210000");
    kafkaParams.put("session.timeout.ms","180000");
    kafkaParams.put("heartbeat.interval.ms","3000");
    Collection<String> topics = Arrays.asList("venkat4");

    SparkConf conf = new SparkConf();
    JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(INTERVAL));


    final JavaInputDStream<ConsumerRecord<String, String>> stream =
            KafkaUtils.createDirectStream(
                    ssc,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
            );

    stream.mapToPair(
            new PairFunction<ConsumerRecord<String, String>, String, String>() {
                @Override
                public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
                    return new Tuple2<>(record.key(), record.value());
                }
            }).print();


    ssc.start();
    ssc.awaitTermination();


}

}

When I run two of this spark streaming job concurrently it fails with error

Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition venkat4-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315) at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) at scala.Option.orElse(Option.scala:289)

Per this https://www.wisdomjobs.com/e-university/apache-kafka-tutorial-1342/apache-kafka-consumer-group-example-19004.html creation of separate instance of kafka consumer with same group will create a rebalance of partitions. I believe the rebalance is not being tolerated by the consumer. How should I fix this

Below is the command used

SPARK_KAFKA_VERSION=0.10 spark2-submit --num-executors 2 --master yarn --deploy-mode client --files jaas.conf#jaas.conf,hive.keytab#hive.keytab --driver-java-options "-Djava.security.auth.login.config=./jaas.conf" --class Streaming.App --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" --conf spark.streaming.kafka.consumer.cache.enabled=false 1-1.0-SNAPSHOT.jar

解决方案

Per this https://www.wisdomjobs.com/e-university/apache-kafka-tutorial-1342/apache-kafka-consumer-group-example-19004.html creation of separate instance of kafka consumer with same group will create a rebalance of partitions. I believe the rebalance is not being tolerated by the consumer. How should I fix this

Now all the partitions are consumed by only one consumer. If data ingestion rate is high, consumer might be slow to consume data at the speed of ingestion.

Adding more consumer to the same consumergroup to consume data from a topic and increase the consumption rate. Spark streaming using this approach 1:1 parallelism between Kafka partitions and Spark partitions. Spark will handle it internally.

If you have more number number of consumers than topic partitions, it will be in idle state and resources are under-utilized. Always recommended the consumer should be less than or equal to partitions count.

Kafka will re-balance, if more processes/threads are added. The ZooKeeper can be reconfigured by Kafka cluster, if any consumer or broker fails to send heartbeat to ZooKeeper.

Kafka rebalance the partitions storage whenever any broker failure or adding new partition to the existing topic. This is kafka specific how to balance the data across partitions in the brokers.

Spark streaming provides simple 1:1 parallelism between Kafka partitions and Spark partitions. If you are not providing any partition details using ConsumerStragies.Assign, consumes from all the partitions of the given topic.

Kafka assigns the partitions of a topic to the consumer in a group, so that each partition is consumed by exactly one consumer in the group. Kafka guarantees that a message is only ever read by a single consumer in the group.

When you start the second spark streaming job, another consumer try to consume the same partition from the same consumer groupid. So it throws the error.

val alertTopics = Array("testtopic")

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> sparkJobConfig.kafkaBrokers,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> sparkJobConfig.kafkaConsumerGroup,
  "auto.offset.reset" -> "latest"
)

val streamContext = new StreamingContext(sparkContext, Seconds(sparkJobConfig.streamBatchInterval.toLong))

val streamData = KafkaUtils.createDirectStream(streamContext, PreferConsistent, Subscribe[String, String](alertTopics, kafkaParams))

If you want to consume partition specific spark job, use the following code.

val topicPartitionsList =  List(new TopicPartition("topic",1))

val alertReqStream1 = KafkaUtils.createDirectStream(streamContext, PreferConsistent, ConsumerStrategies.Assign(topicPartitionsList, kafkaParams))

https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html#consumerstrategies

Consumers can join a group by using the samegroup.id.

val topicPartitionsList =  List(new TopicPartition("topic",3), new TopicPartition("topic",4))

    val alertReqStream2 = KafkaUtils.createDirectStream(streamContext, PreferConsistent, ConsumerStrategies.Assign(topicPartitionsList, kafkaParams))

Adding two more consumers is adding into same groupid.

Please read the Spark-Kafka integration guide. https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html

Hope this helps.

这篇关于2 个具有相同消费者组 ID 的火花流作业的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆