当主线程睡眠少于1000时无法生成消息 [英] Cannot produce Message when Main Thread sleep less than 1000

查看:126
本文介绍了当主线程睡眠少于1000时无法生成消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我使用Kafka的Java API时,如果我的主线程睡眠少于2000ns,它将无法发出任何消息.我真的想知道为什么会发生这种情况吗?

When I am using the Java API of Kafka,if I let my main Thread sleep less than 2000ns,it cannot prodece any message.I really want to know why this happen?

这是我的制片人:

public class Producer {
    private final KafkaProducer<String, String> producer;
    private final String topic;

    public Producer(String topic, String[] args) {
        //......
        //......
        producer = new KafkaProducer<>(props);
        this.topic = topic;
    }

    public void producerMsg() throws InterruptedException {
        String data = "Apache Storm is a free and open source distributed";
        data = data.replaceAll("[\\pP‘’""]", "");
        String[] words = data.split(" ");
        Random _rand = new Random();

        Random rnd = new Random();
        int events = 10;
        for (long nEvents = 0; nEvents < events; nEvents++) {
            long runtime = new Date().getTime();
            int lastIPnum = rnd.nextInt(255);
            String ip = "192.168.2." + lastIPnum;
            String msg = words[_rand.nextInt(words.length)];
            try {
                producer.send(new ProducerRecord<>(topic, ip, msg));
                System.out.println("Sent message: (" + ip + ", " + msg + ")");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Producer producer = new Producer(Constants.TOPIC, args);
        producer.producerMsg();
        //If I write Thread.sleep(1000),It will not work!!!!!!!!!!!!!!!!!!!!
        Thread.sleep(2000);
    }
}

赞赏

推荐答案

您能显示用于配置Producer的道具吗?我只是在猜测...

can you show the props you are using for configuring the Producer ? I'm only guessing that it's possible that ...

在producerMsg()中,您使用的是异步方式来使用生产者,所以只需使用producer.send(),这意味着将消息放入内部缓冲区中以进行批量处理,然后将其发送.生产者有一个内部线程可以从缓冲区中获取并发送批处理.也许仅1000毫秒不足以达到生产者真正发送消息的条件(请参阅batch.size和linger.ms),主应用程序结束并且生产者死而没有发送消息.给它更多的时间(2000毫秒),它可以工作.顺便说一句,我没有尝试代码.

In the producerMsg() you are using the async way to use the producer so just producer.send() which means that the message is put in an internal buffer for making batches that will be sent later. The producer has an internal thread to get from the buffer and sending the batch. Maybe that only 1000 ms aren't enough for reaching the condition where the producer really sends messages (see batch.size and linger.ms), the main application ends and the producer dies without sending messages. Giving it more time (2000 ms), it works. Btw, I didn't try the code.

所以原因似乎是您的:

props.put("linger.ms", 1000);

与您的睡眠相匹配.因此,生产者将在1000毫秒后开始发送消息,因为批处理尚未满(batch.size为16 MB).同时,主线程在休眠1秒后结束,并且生产方不发送消息.您必须使用更短的linger.ms时间.

that matches with your sleep. So the producer will start to send messages after 1000 ms, because the batch isn't already full (batch.size is 16 MB). At same time, the main thread ends after sleeping 1 secs and the producer doesn't send messages. You have to use a lower linger.ms time.

这篇关于当主线程睡眠少于1000时无法生成消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆