消费者在 Apache Kafka 中消费消息的延迟 [英] Delay in Consumer consuming messages in Apache Kafka

查看:158
本文介绍了消费者在 Apache Kafka 中消费消息的延迟的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Kafka 0.8.0 并试图实现下面提到的场景.

I am using Kafka 0.8.0 and trying to achieve the below mentioned scenario.

JCA API(充当生产者并向其发送数据)-----> 消费者------> HBase

我使用 JCA 客户端获取数据后立即将每条消息发送给消费者.例如,一旦生产者发送消息 no.1 ,我想从消费者那里获取相同的信息并放入"HBase.但是我的消费者在一些随机 n 消息之后开始获取消息.我想让生产者和消费者同步,以便他们开始协同工作.

I am sending each message to consumer as soon as I fetch the data using JCA Client. For instance, as soon as producer sends message no.1 , I want to fetch the same from consumer and 'put' in HBase. But my consumer starts fetching the messages after some random n messages . I want to put the producer and consumer in sync so that both of them start working together.

我用过:

1 个经纪人

1 个主题

1 个单一生产者和高级消费者

谁能建议我需要做什么才能达到同样的目标?

Can anyone suggest what do i need to do to achieve the same?

添加一些相关的代码片段.

Adding some relevant code snippet.

Consumer.java

public class Consumer extends Thread {
    private final ConsumerConnector consumer;
    private final String topic;
    PrintWriter pw = null;
    int t = 0;
    StringDecoder kd = new StringDecoder(null);
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    Map<String, List<KafkaStream<String, Signal>>> consumerMap;
    KafkaStream<String, Signal> stream;
    ConsumerIterator<String, Signal> it;

    public Consumer(String topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());

        this.topic = topic;
        topicCountMap.put(topic, new Integer(1));
        consumerMap = consumer.createMessageStreams(topicCountMap, kd, new Serializer(
                new VerifiableProperties()));
        stream = consumerMap.get(topic).get(0);
        it = stream.iterator();

    }

    private static ConsumerConfig createConsumerConfig() {
        Properties props = new Properties();
        props.put("zookeeper.connect", KafkaProperties.zkConnect);
        props.put("group.id", KafkaProperties.groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("fetch.size", "1024");

        return new ConsumerConfig(props);

    }

    synchronized public void run() {

        while (it.hasNext()) {
            t = (it.next().message()).getChannelid();
            System.out.println("In Consumer received msg" + t);
        }
    }
}

producer.java

public class Producer {
    public final kafka.javaapi.producer.Producer<String, Signal> producer;
    private final String topic;
    private final Properties props = new Properties();

    public Producer(String topic)
    {
        props.put("serializer.class", "org.bigdata.kafka.Serializer");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list", "localhost:9092");
        // Use random partitioner. Don't need the key type. Just set it to Integer.
        // The message is of type userdefined Object .
        producer = new kafka.javaapi.producer.Producer<String,Signal(newProducerConfig(props));
        this.topic = topic;
    }
}

KafkaProperties.java

public interface KafkaProperties {
    final static String zkConnect = "127.0.0.1:2181";
    final static String groupId = "group1";
    final static String topic = "test00";
    final static String kafkaServerURL = "localhost";
    final static int kafkaServerPort = 9092;
    final static int kafkaProducerBufferSize = 64 * 1024;
    final static int connectionTimeOut = 100000;
    final static int reconnectInterval = 10000;
    final static String clientId = "SimpleConsumerDemoClient";
}

这是消费者在前 10 条消息中的行为方式,它不会对消费者收到的消息进行系统输出,但从第 11 条消息开始,它开始正常运行.

     producer sending msg1

     producer sending msg2

     producer sending msg3

     producer sending msg4

     producer sending msg5

     producer sending msg6

     producer sending msg7

     producer sending msg8

     producer sending msg9

     producer sending msg10

     producer sending msg11

     producer sending msg12
     In Consumer received msg12

     producer sending msg13
     In Consumer received msg13

     producer sending msg14
     In Consumer received msg14

     producer sending msg15
     In Consumer received msg15

     producer sending msg16
     In Consumer received msg16

     producer sending msg17
     In Consumer received msg17

     producer sending msg18
     In Consumer received msg18

     producer sending msg19
     In Consumer received msg19

     producer sending msg20
     In Consumer received msg20

     producer sending msg21
     In Consumer received msg21

已在生产者向消费者发送消息的位置添加侦听器功能.而且我使用的默认生产者配置没有覆盖它

public synchronized void onValueChanged(final MonitorEvent event_) {


    // Get the value from the DBR
    try {
        final DBR dbr = event_.getDBR();

        final String[] val = (String[]) dbr.getValue();

        producer1.producer.send(new KeyedMessage<String, Signal>         
                    (KafkaProperties.topic,new Signal(messageNo)));
        System.out.println("producer sending msg"+messageNo);

        messageNo++;


    } catch (Exception ex) {
        ex.printStackTrace();
    }
}

推荐答案

  1. 尝试将 props.put("request.required.acks", "1") 添加到生产者配置中.默认情况下,生产者不等待确认并且不保证消息传递.因此,如果您在测试之前启动 broker,生产者可能会在 broker 完全初始化之前开始发送消息,并且前几条消息可能会丢失.

  1. Try to add props.put("request.required.acks", "1") to producer configuration. By default producer doesn't wait for acks and message delivery is not guaranteed. So, if you start broker just before your test, producer may start to send messages before broker is fully initialized and first several messages may be lost.

尝试将 props.put("auto.offset.reset", "smallest") 添加到消费者配置中.它等同于 kafka-console-consumer.sh 的 --from-beginning 选项.如果您的消费者晚于生产者启动并且 Zookeeper 中没有保存偏移数据,那么默认情况下它将开始仅使用新消息(请参阅 消费者配置在文档中).

Try to add props.put("auto.offset.reset", "smallest") to consumer configuration. It is equal to --from-beginning option of kafka-console-consumer.sh. If your consumer starts later than producer and there is no offset data saved in Zookeeper, then by default it will start consuming only new messages (see Consumer configs in docs).

这篇关于消费者在 Apache Kafka 中消费消息的延迟的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆