自己的执行者替换默认的SimpleAsyncTaskExecutor的缺点和风险是什么 [英] what are the drawnbacks and risks replacing default SimpleAsyncTaskExecutor by own Executor

查看:612
本文介绍了自己的执行者替换默认的SimpleAsyncTaskExecutor的缺点和风险是什么的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

个人知识:我从 javacodegeeks阅读:"... SimpleAsyncTaskExecutor可以用于玩具项目,但对于任何大于此的项目都存在一定的风险,因为它不限制并发线程且不重用线程.为了安全起见,我们还将添加一个任务执行器bean ..."和 baeldung 的简单示例,说明如何添加我们自己的任务执行器.但是我可以找到任何指导来解释产生的后果以及一些值得应用的案例.

Personal knowlegedment: I read from javacodegeeks: "... SimpleAsyncTaskExecutor is ok for toy projects but for anything larger than that it’s a bit risky since it does not limit concurrent threads and does not reuse threads. So to be safe, we will also add a task executor bean... " and from baeldung a very simple example how to add our own Task Executor. But I can find any guidance explaining what are the consequences and some worth cases to apply it.

个人愿望:我正在努力为将要发布的有关Kafka主题的微服务日志提供一个企业架构.似乎合理的说法是由于不限制并发线程的使用和不重用而引起的风险",这主要是针对我的基于日志的情况.

Personal desire: I am working hard to provide a corporative architecture for our microservices logs be publish on Kafka topics. It seems reasonble the statement " risky caused by not limit concurrent threads and not reuse it" mainly for my case that is based on logs.

我在本地桌面成功运行了以下代码,但是我想知道我是否正确提供了一个自定义Task Executor.

I am running the bellow code succesfully in local desktop but I am wondering if I am providing a custom Task Executor properly.

我的问题:考虑到我已经在使用kafkatempla(即,至少同步同步,单例和线程安全,至少就产生/发送消息而言,默认情况下是这样),这种配置是否确实朝着正确的方向重复使用线程和避免在使用SimpleAsyncTaskExecutor时意外传播线程创建?

My question: does this configuration bellow taking in account I am already using kafkatempla (i.e. syncronized, singleton and thread safe by default at least for producing/sending messsage as far as understand it) really going in right direction to reuse threads and avoid spread accidentally threads creation while using SimpleAsyncTaskExecutor?

生产者配置

@EnableAsync
@Configuration
public class KafkaProducerConfig {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);

    @Value("${kafka.brokers}")
    private String servers;

    @Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("KafkaMsgExecutor-");
        executor.initialize();
        return executor;
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return props;
    }

}

制作人

@Service
public class Producer {

    private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Async
    public void send(String topic, String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onSuccess(final SendResult<String, String> message) {
                LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(final Throwable throwable) {
                LOGGER.error("unable to send message= " + message, throwable);
            }
        });
    }
}

出于演示目的:

@SpringBootApplication
public class KafkaDemoApplication  implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(KafkaDemoApplication.class, args);

    }

    @Autowired
    private Producer p;

    @Override
    public void run(String... strings) throws Exception {
        p.send("test", " qualquer messagem demonstrativa");
    }

}

推荐答案

这是SimpleAsyncTaskExecutor

protected void doExecute(Runnable task) {
    Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
    thread.start();
}

为每个任务创建新线程,在Java中创建线程并不便宜:(

New thread is created for every task, thread creation in Java is not cheap: (Reference)

线程对象占用大量内存,在大型应用程序中,分配和取消分配许多线程对象会产生大量内存管理开销.

Thread objects use a significant amount of memory, and in a large-scale application, allocating and deallocating many thread objects creates a significant memory management overhead.

=>使用此任务执行程序重复执行任务将对应用程序性能产生负面影响(此外,默认情况下,此执行程序不会限制并发任务的数量)

=> Repeatedly execute task with this task executor will negatively affect application performance (moreover, this executor by default does not limit the number of concurrent tasks)

这就是为什么建议您使用线程池实现的原因,线程创建开销仍然存在,但是由于线程被重用而不是create-fire-forget,因此大大减少了.

That's why you're advised to use a thread pool implementation, the thread creation overhead is still there but significantly reduced due to threads are reused instead of create-fire-forget.

在配置ThreadPoolTaskExecutor时,应根据您的应用程序负载正确定义两个值得注意的参数:

When configure ThreadPoolTaskExecutor, two notable parameters should be defined properly according to your application load:

  1. private int maxPoolSize = Integer.MAX_VALUE;

这是池中的最大线程数.

This is maximum number of threads in the pool.

private int queueCapacity = Integer.MAX_VALUE;

这是排队的最大任务数.当队列已满时,默认值可能会导致OutOfMemory异常.

This is maximum number of tasks queued. Default value may cause OutOfMemory exception when the queue is full.

使用默认值(Integer.MAX_VALUE)可能会导致资源不足/服务器崩溃.

Using default value (Integer.MAX_VALUE) may lead to out of resource / crash in your server.

您可以通过增加最大池大小setMaxPoolSize()的数量来提高吞吐量,以减少加载增加时的预热,将核心池大小设置为更高的值setCorePoolSize()(负载增加)

You can improve the thoughput by increasing number of maximum poolsize setMaxPoolSize(), to reduce the warm up when loading increase, set core poolsize to higher value setCorePoolSize() (any number of threads different between maxPoolSize - corePoolSize will be initilized when load increase)

这篇关于自己的执行者替换默认的SimpleAsyncTaskExecutor的缺点和风险是什么的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆