如何使用千分尺计时器记录异步方法的持续时间(返回Mono或Flux) [英] How to use Micrometer Timer to record duration of async method (returns Mono or Flux)
本文介绍了如何使用千分尺计时器记录异步方法的持续时间(返回Mono或Flux)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我想使用Micrometer记录异步方法最终何时发生的执行时间.有推荐的方法吗?
I'd like to use Micrometer to record the execution time of an async method when it eventually happens. Is there a recommended way to do this?
示例:Kafka回复模板.我想记录实际执行sendAndReceive调用(在请求主题上发送消息并在回复主题上接收响应)所花费的时间.
Example: Kafka Replying Template. I want to record the time it takes to actually execute the sendAndReceive call (sends a message on a request topic and receives a response on a reply topic).
public Mono<String> sendRequest(Mono<String> request) {
return request
.map(r -> new ProducerRecord<String, String>(requestsTopic, r))
.map(pr -> {
pr.headers()
.add(new RecordHeader(KafkaHeaders.REPLY_TOPIC,
"reply-topic".getBytes()));
return pr;
})
.map(pr -> replyingKafkaTemplate.sendAndReceive(pr))
... // further maps, filters, etc.
类似
responseGenerationTimer.record(() -> replyingKafkaTemplate.sendAndReceive(pr)))
在这里不工作;它只是记录创建Supplier
所需的时间,而不是实际的执行时间.
won't work here; it just records the time that it takes to create the Supplier
, not the actual execution time.
推荐答案
您可以使用reactor.util.context.Context
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import static org.hamcrest.Matchers.is;
public class TestMonoTimer {
private static final Logger LOG = LoggerFactory.getLogger(TestMonoTimer.class);
private static final String TIMER_SAMPLE = "TIMER_SAMPLE";
private static final Timer TIMER = new SimpleMeterRegistry().timer("test");
private static final AtomicBoolean EXECUTION_FLAG = new AtomicBoolean();
@Test
public void testMonoTimer() {
Mono.fromCallable(() -> {
Thread.sleep(1234);
return true;
}).transform(timerTransformer(TIMER))
.subscribeOn(Schedulers.parallel())
.subscribe(EXECUTION_FLAG::set);
Awaitility.await().atMost(2, TimeUnit.SECONDS).untilAtomic(EXECUTION_FLAG, is(true));
Assert.assertTrue(TIMER.totalTime(TimeUnit.SECONDS) > 1);
}
private static <T> Function<Mono<T>, Publisher<T>> timerTransformer(Timer timer) {
return mono -> mono
.flatMap(t -> Mono.subscriberContext()
.flatMap(context -> Mono.just(context.<Timer.Sample>get(TIMER_SAMPLE).stop(timer))
.doOnNext(duration -> LOG.info("Execution time is [{}] seconds",
duration / 1000000000D))
.map(ignored -> t)))
.subscriberContext(context -> context.put(TIMER_SAMPLE, Timer.start(Clock.SYSTEM)));
}
}
这篇关于如何使用千分尺计时器记录异步方法的持续时间(返回Mono或Flux)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文