反应程序在将所有消息发送到 Kafka 之前提前退出 [英] Reactive program exiting early before sending all messages to Kafka

查看:32
本文介绍了反应程序在将所有消息发送到 Kafka 之前提前退出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是之前反应式 kafka 问题的后续问题(将数据流发送到反应式 kafka 时出现的问题).

This is a subsequent question to a previous reactive kafka issue (Issue while sending the Flux of data to the reactive kafka).

我正在尝试使用反应式方法向 kafka 发送一些日志记录.这是使用反应式 kafka 发送消息的反应式代码.

I am trying to send some log records to the kafka using the reactive approach. Here is the reactive code sending messages using reactive kafka.

public class LogProducer {

    private final KafkaSender<String, String> sender;

    public LogProducer(String bootstrapServers) {

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "log-producer");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        SenderOptions<String, String> senderOptions = SenderOptions.create(props);

        sender = KafkaSender.create(senderOptions);
    }

    public void sendMessages(String topic, Flux<Logs.Data> records) throws InterruptedException {
    
        AtomicInteger sentCount = new AtomicInteger(0);
        sender.send(records
        .map(record -> {
            LogRecord lrec = record.getRecords().get(0);
            String id = lrec.getId();
            Thread.sleep(0, 5); // sleep for 5 ns
            return SenderRecord.create(new ProducerRecord<>(topic, id,
                    lrec.toString()), id);
        })).doOnNext(res -> sentCount.incrementAndGet()).then()
        .doOnError(e -> {
            log.error("[FAIL]: Send to the topic: '{}' failed. "
                    + e, topic);
        })
        .doOnSuccess(s -> {
            log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);
        })
        .subscribe();
    }

}


public class ExecuteQuery implements Runnable {

    private LogProducer producer = new LogProducer("localhost:9092");

    @Override
    public void run() {
        Flux<Logs.Data> records = ...
        producer.sendMessages(kafkaTopic, records);
        .....
        .....
        // processing related to the messages sent
    }

}

因此,即使 Thread.sleep(0, 5); 存在,有时它也不会将所有消息发送到 kafka 并且程序存在早期打印 SUCCESS 消息(log.info("[SUCCESS]: {} 条记录发送到主题:'{}'", sentCount, topic);).有没有更具体的方法来解决这个问题.例如,使用某种回调,让线程等待所有消息发送成功.

So even when the Thread.sleep(0, 5); is there, sometimes it does not send all messages to kafka and the program exists early printing the SUCCESS message (log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);). Is there any more concrete way to solve this problem. For example, using some kind of callback, so that thread will wait for all messages to be sent successfully.

我有一个 spring 控制台应用程序并通过调度程序以固定速率运行 ExecuteQuery,就像这样

I have a spring console application and running ExecuteQuery through a scheduler at fixed rate, something like this

public class Main {

private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private ExecutorService executor = Executors.newFixedThreadPool(POOL_SIZE);

public static void main(String[] args) {
     QueryScheduler scheduledQuery = new QueryScheduler();
     scheduler.scheduleAtFixedRate(scheduledQuery, 0, 5, TimeUnit.MINUTES);
}

class QueryScheduler implements Runnable {

  @Override
  public void run() {
      // preprocessing related to time
      executor.execute(new ExecuteQuery());
      // postprocessing related to time
  }

}
}

推荐答案

Your Thread.sleep(0, 5);//sleep for 5 ns 没有任何值让主线程被阻塞,所以它在需要时退出,你的 ExecuteQuery 可能还没有完成它的工作.

Your Thread.sleep(0, 5); // sleep for 5 ns does not have any value for a main thread to be blocked, so it exits when it needs and your ExecuteQuery may not finish its job yet.

不清楚您如何启动您的应用程序,但我建议 Thread.sleep() 正好在主线程中进行阻塞.准确地说是public static void main(String[] args) { 方法impl.

It is not clear how you start your application, but I recommended Thread.sleep() exactly in a main thread to block. To be precise in the public static void main(String[] args) { method impl.

这篇关于反应程序在将所有消息发送到 Kafka 之前提前退出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆