如何使用千分尺计时器记录异步方法的持续时间(返回Mono或Flux) [英] How to use Micrometer Timer to record duration of async method (returns Mono or Flux)

查看:389
本文介绍了如何使用千分尺计时器记录异步方法的持续时间(返回Mono或Flux)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用Micrometer记录异步方法最终何时发生的执行时间.有推荐的方法吗?

I'd like to use Micrometer to record the execution time of an async method when it eventually happens. Is there a recommended way to do this?

示例:Kafka回复模板.我想记录实际执行sendAndReceive调用(在请求主题上发送消息并在回复主题上接收响应)所花费的时间.

Example: Kafka Replying Template. I want to record the time it takes to actually execute the sendAndReceive call (sends a message on a request topic and receives a response on a reply topic).

    public Mono<String> sendRequest(Mono<String> request) {
        return request
            .map(r -> new ProducerRecord<String, String>(requestsTopic, r))
            .map(pr -> {
                pr.headers()
                        .add(new RecordHeader(KafkaHeaders.REPLY_TOPIC,
                                "reply-topic".getBytes()));
                return pr;
            })
            .map(pr -> replyingKafkaTemplate.sendAndReceive(pr))
            ... // further maps, filters, etc.

类似

responseGenerationTimer.record(() -> replyingKafkaTemplate.sendAndReceive(pr)))

在这里不工作;它只是记录创建Supplier所需的时间,而不是实际的执行时间.

won't work here; it just records the time that it takes to create the Supplier, not the actual execution time.

推荐答案

您可以使用reactor.util.context.Context

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import static org.hamcrest.Matchers.is;

public class TestMonoTimer {
    private static final Logger LOG = LoggerFactory.getLogger(TestMonoTimer.class);

    private static final String TIMER_SAMPLE = "TIMER_SAMPLE";
    private static final Timer TIMER = new SimpleMeterRegistry().timer("test");
    private static final AtomicBoolean EXECUTION_FLAG = new AtomicBoolean();

    @Test
    public void testMonoTimer() {
        Mono.fromCallable(() -> {
            Thread.sleep(1234);
            return true;
        }).transform(timerTransformer(TIMER))
                .subscribeOn(Schedulers.parallel())
                .subscribe(EXECUTION_FLAG::set);

        Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAtomic(EXECUTION_FLAG, is(true));
        Assert.assertTrue(TIMER.totalTime(TimeUnit.SECONDS) > 1);
    }

    private static <T> Function<Mono<T>, Publisher<T>> timerTransformer(Timer timer) {
        return mono -> mono
                .flatMap(t -> Mono.subscriberContext()
                        .flatMap(context -> Mono.just(context.<Timer.Sample>get(TIMER_SAMPLE).stop(timer))
                                .doOnNext(duration -> LOG.info("Execution time is [{}] seconds",
                                        duration / 1000000000D))
                                .map(ignored -> t)))
                .subscriberContext(context -> context.put(TIMER_SAMPLE, Timer.start(Clock.SYSTEM)));
    }
}

这篇关于如何使用千分尺计时器记录异步方法的持续时间(返回Mono或Flux)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆