如何使用直接流在Kafka Spark Streaming中指定消费者组 [英] how to specify consumer group in Kafka Spark Streaming using direct stream

查看:38
本文介绍了如何使用直接流在Kafka Spark Streaming中指定消费者组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何使用直接流 API 为 kafka Spark 流指定消费者组 ID.

HashMapkafkaParams = new HashMap();kafkaParams.put("metadata.broker.list", brokers);kafkaParams.put("auto.offset.reset", "最大");kafkaParams.put("group.id", "app1");JavaPairInputDStream消息 = KafkaUtils.createDirectStream(jssc,字符串类,字符串类,StringDecoder.class,StringDecoder.class,卡夫卡参数,主题集);

虽然我已经指定了配置,但不确定是否遗漏了什么.使用 spark1.3

kafkaParams.put("group.id", "app1");

解决方案

直接流 API 使用低级 Kafka API,因此无论如何不使用消费者组.如果您想在 Spark Streaming 中使用消费者组,则必须使用基于接收器的 API.

完整的细节可以在文档中找到!

How to specify consumer group id for kafka spark streaming using direct stream API.

HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("auto.offset.reset", "largest");
kafkaParams.put("group.id", "app1");

    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
            jssc, 
            String.class, 
            String.class,
            StringDecoder.class, 
            StringDecoder.class, 
            kafkaParams, 
            topicsSet
    );

though i have specified the configuration not sure if missing something. using spark1.3

kafkaParams.put("group.id", "app1");

解决方案

The direct stream API use the low level Kafka API, and as so doesn't use consumer groups in anyway. If you want to use consumer groups with Spark Streaming, you'll have to use the receiver based API.

Full details are available in the doc !

这篇关于如何使用直接流在Kafka Spark Streaming中指定消费者组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆