在Spring AMQP中正确使用Spring WebClient [英] Correct way of using spring webclient in spring amqp

查看:0
本文介绍了在Spring AMQP中正确使用Spring WebClient的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个使用Rabbitmq-

消息的Spring AMQP应用程序的技术堆栈
Spring boot 2.2.6.RELEASE
Reactor Netty 0.9.12.RELEASE
Reactor Core 3.3.10.RELEASE

应用程序部署在四核RHEL上。

下面是Rabbitmq使用的一些配置

@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(<<HOST NAME>>);
cachingConnectionFactory.setUsername(<<USERNAME>>);
cachingConnectionFactory.setPassword(<<PASSWORD>>);
cachingConnectionFactory.setChannelCacheSize(50);
return cachingConnectionFactory;
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setMaxConcurrentConsumers(50);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setDefaultRequeueRejected(false);  /** DLQ is in place **/
return factory;
}

消费者使用Spring WebClient同步方式进行下游API调用。以下是Web客户端的配置

@Bean
public WebClient webClient() {
ConnectionProvider connectionProvider = ConnectionProvider
.builder("fixed")
.lifo()
.pendingAcquireTimeout(Duration.ofMillis(200000))
.maxConnections(16)
.pendingAcquireMaxCount(3000)
.maxIdleTime(Duration.ofMillis(290000))
.build();

HttpClient client = HttpClient.create(connectionProvider);
client.tcpConfiguration(<<connection timeout, read timeout, write timeout is set here....>>);

Webclient.Builder builder = 
Webclient.builder().baseUrl(<<base URL>>).clientConnector(new ReactorClientHttpConnector(client));

return builder.build();
}

此Web客户端被自动连接到@Service类中

@Autowired
private Webclient webClient;

并在两个地方使用,如下所示。第一名是一个电话-

public DownstreamStatusEnum downstream(String messageid, String payload, String contentType) {
return call(messageid,payload,contentType);
}

private DownstreamStatusEnum call(String messageid, String payload, String contentType) {
DownstreamResponse response = sendRequest(messageid,payload,contentType).**block()**;
return response;
}

private Mono<DownstreamResponse> sendRequest(String messageid, String payload, String contentType) {
return webClient
.method(POST)
.uri(<<URI>>)
.contentType(MediaType.valueOf(contentType))
.body(BodyInserters.fromValue(payload))
.exchange()
.flatMap(response -> response.bodyToMono(DownstreamResponse.class));

}

Other Place需要并行下行调用,现已实现如下

private Flux<DownstreamResponse> getValues (List<DownstreamRequest> reqList, String messageid) {
return Flux
.fromIterable(reqList)
.parallel()
.runOn(Schedulers.elastic())
.flatMap(s -> {
  return webClient
         .method(POST)
         .uri(<<downstream url>>)
         .body(BodyInserters.fromValue(s))
         .exchange()
         .flatMap(response -> {
               if(response.statusCode().isError()) {
                  return Mono.just(new DownstreamResponse());
               }
               return response.bodyToMono(DownstreamResponse.class);
            });
        }).sequential();
}

public List<DownstreamResponse> updateValue (List<DownstreamRequest> reqList,String messageid) {
         return getValues(reqList,messageid).collectList().**block()**;
}

应用程序在过去一年左右的时间里一直运行良好。最近,我们看到了一个问题,一个或多个消费者似乎只是停留在默认预取(250)数量的未确认状态的消息。解决此问题的唯一方法是重新启动应用程序。

我们最近没有进行任何代码更改。此外,最近也没有任何基础设施变化。

发生这种情况时,我们会进行线程转储。观察到的模式是相似的。大多数消费者线程处于TIMED_WAITING状态,有一到两个消费者线程处于等待状态,堆栈如下-

"org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-13" waiting for condition ...
  java.lang.Thread.State: WAITING (parking)
 - parking to wait for ......
 at .......
 at .......
 at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(......
 at reactor.core.publisher.Mono.block(....
 at .........WebClientServiceImpl.call(...
   

另见下文-

"org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-13" waiting for condition ...
 java.lang.Thread.State: WAITING (parking)
 - parking to wait for ......
 at .......
 at .......
 at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(......
 at reactor.core.publisher.Mono.block(....
 at .........WebClientServiceImpl.updateValue(...

不确定此线程转储是否显示使用者线程实际上停滞在此 阻止呼叫(&Q;)。

请帮助建议此处可能存在的问题以及需要采取哪些步骤来解决此问题。早些时候,我们认为这可能是Rabbitmq/SpringAqMP的问题,但基于线程转储,看起来像是WebClient阻止调用的问题。

添加Blockhound时,它打印在日志文件中的堆栈跟踪下-

Error has been observed at following site(s) 
  Checkpoint Request to POST https://....... [DefaultWebClient]
Stack Trace:
      at java.lang.Object.wait
      ......
      at java.net.InetAddress.checkLookupTable
      at java.net.InetAddress.getAddressFromNameService
      ......
      at io.netty.util.internal.SocketUtils$8.run
      ......
      at io.netty.resolver.DefaultNameResolver.doResolve 

推荐答案

抱歉,刚意识到并行通量调用中的Flat Map实际上是这样的

.flatMap(response -> {
  if(response.statusCode().isError()) {
     return Mono.just(new DownstreamResponse());
  }
  return response.bodyToMono(DownstreamResponse.class);
});

所以在错误情况下,我认为底层连接没有被正确释放。当我像下面这样更新它时,它似乎已经解决了这个问题-

.flatMap(response -> {
  if(response.statusCode().isError()) {
     response.releaseBody().thenReturn(Mono.just(new DownstreamResponse()));
  }
  return response.bodyToMono(DownstreamResponse.class);
});

这篇关于在Spring AMQP中正确使用Spring WebClient的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆