主线程睡眠少于 1000 时无法产生消息 [英] Cannot produce Message when Main Thread sleep less than 1000

查看:31
本文介绍了主线程睡眠少于 1000 时无法产生消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我使用Kafka的Java API时,如果我让我的主线程睡眠少于2000ns,它无法产生任何消息.我真的很想知道为什么会发生这种情况?

When I am using the Java API of Kafka,if I let my main Thread sleep less than 2000ns,it cannot prodece any message.I really want to know why this happen?

这是我的制作人:

public class Producer {
    private final KafkaProducer<String, String> producer;
    private final String topic;

    public Producer(String topic, String[] args) {
        //......
        //......
        producer = new KafkaProducer<>(props);
        this.topic = topic;
    }

    public void producerMsg() throws InterruptedException {
        String data = "Apache Storm is a free and open source distributed";
        data = data.replaceAll("[\\pP‘’""]", "");
        String[] words = data.split(" ");
        Random _rand = new Random();

        Random rnd = new Random();
        int events = 10;
        for (long nEvents = 0; nEvents < events; nEvents++) {
            long runtime = new Date().getTime();
            int lastIPnum = rnd.nextInt(255);
            String ip = "192.168.2." + lastIPnum;
            String msg = words[_rand.nextInt(words.length)];
            try {
                producer.send(new ProducerRecord<>(topic, ip, msg));
                System.out.println("Sent message: (" + ip + ", " + msg + ")");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Producer producer = new Producer(Constants.TOPIC, args);
        producer.producerMsg();
        //If I write Thread.sleep(1000),It will not work!!!!!!!!!!!!!!!!!!!!
        Thread.sleep(2000);
    }
}

欣赏

推荐答案

你能展示你用来配置 Producer 的道具吗?我只是猜测有可能......

can you show the props you are using for configuring the Producer ? I'm only guessing that it's possible that ...

在 producerMsg() 中,您使用异步方式来使用生产者,因此只是 producer.send(),这意味着消息被放入内部缓冲区,用于制作稍后发送的批次.生产者有一个内部线程从缓冲区获取并发送批处理.也许只有 1000 毫秒不足以达到生产者真正发送消息的条件(参见 batch.size 和 linger.ms),主应用程序结束,生产者在没有发送消息的情况下死亡.给它更多时间(2000 毫秒),它会起作用.顺便说一句,我没有尝试代码.

In the producerMsg() you are using the async way to use the producer so just producer.send() which means that the message is put in an internal buffer for making batches that will be sent later. The producer has an internal thread to get from the buffer and sending the batch. Maybe that only 1000 ms aren't enough for reaching the condition where the producer really sends messages (see batch.size and linger.ms), the main application ends and the producer dies without sending messages. Giving it more time (2000 ms), it works. Btw, I didn't try the code.

所以原因似乎是你的:

props.put("linger.ms", 1000);

与您的睡眠相匹配.因此生产者将在 1000 毫秒后开始发送消息,因为批次尚未满(batch.size 为 16 MB).同时,主线程在休眠 1 秒后结束,生产者不发送消息.您必须使用较低的 linger.ms 时间.

that matches with your sleep. So the producer will start to send messages after 1000 ms, because the batch isn't already full (batch.size is 16 MB). At same time, the main thread ends after sleeping 1 secs and the producer doesn't send messages. You have to use a lower linger.ms time.

这篇关于主线程睡眠少于 1000 时无法产生消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆