Spring Boot - Kafka 消费者 Bean 范围 [英] Spring Boot - Kafka Consumer Bean Scope

查看:80
本文介绍了Spring Boot - Kafka 消费者 Bean 范围的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在具有 SCOPE_REQUEST 范围的 Spring Boot 应用程序中使用 CacheManager.

I'm using a CacheManager in a Spring Boot application with SCOPE_REQUEST scope.

@Bean
@Scope(value = WebApplicationContext.SCOPE_REQUEST, proxyMode = ScopedProxyMode.TARGET_CLASS)
public CacheManager cacheManager() {
  return new ConcurrentMapCacheManager();
}

我也在使用 Kafka 进行微服务之间的通信.实际上,我正在通过 Kafka 消费者接收一个事件,但出现以下错误:

I'm also using Kafka for communication between microservices. Actually I'm receiving an event through a Kafka consumer and I get the following error:

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'scopedTarget.cacheManager': Scope 'request' is not active for the current thread;
...
Caused by: java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread?

很明显,侦听器线程上缺少 CacheManager bean.我的目标是让 Spring Boot/Kafka 框架为每个消费的 Kafka 事件创建平均值,就像它为 Web 请求一样.我不知道如何才能做到这一点,有人可以帮助我吗?

It's clear that the CacheManager bean is missing on the listener thread. My goal is to have let the Spring Boot/Kafka framework to create the mean for each consumed Kafka events just as it's for the web requests. I have no idea how I could achive that, could someone help me ?

非常感谢,祝你有美好的一天!

Thank you so much, Have a nice day!

推荐答案

@Gary Russel既真又假,同时我成功找到了解决方案,创建了以下类:

@Gary Russel That's true and false at the same time, meantime I succeed to find a solution, create the below class:

    public class KafkaRequestScopeAttributes implements RequestAttributes {

      private Map<String, Object> requestAttributeMap = new HashMap<>();

      @Override
      public Object getAttribute(String name, int scope) {
        if (scope == RequestAttributes.SCOPE_REQUEST) {
          return this.requestAttributeMap.get(name);
        }
        return null;
      }

      @Override
      public void setAttribute(String name, Object value, int scope) {
        if (scope == RequestAttributes.SCOPE_REQUEST) {
          this.requestAttributeMap.put(name, value);
        }
      }

      @Override
      public void removeAttribute(String name, int scope) {
        if (scope == RequestAttributes.SCOPE_REQUEST) {
          this.requestAttributeMap.remove(name);
        }
      }

      @Override
      public String[] getAttributeNames(int scope) {
        if (scope == RequestAttributes.SCOPE_REQUEST) {
          return this.requestAttributeMap.keySet().toArray(new String[0]);
        }
        return new String[0];
      }

      @Override
      public void registerDestructionCallback(String name, Runnable callback, int scope) {
        // Not Supported
      }

      @Override
      public Object resolveReference(String key) {
        // Not supported
        return null;
      }

      @Override
      public String getSessionId() {
        return null;
      }

      @Override
      public Object getSessionMutex() {
        return null;
      }
    }

然后将以下两行添加到您的 KafkaListener 方法的开始和结束中:

then add the following two lines into your KafkaListener method's start and end:

RequestContextHolder.setRequestAttributes(new KafkaRequestScopeAttributes());
RequestContextHolder.resetRequestAttributes();

通过这样做,您可以强制在 Kafka 侦听器中创建 REQUEST_SCOPE.

By doing that you can force to create the REQUEST_SCOPE in a Kafka Listener.

这篇关于Spring Boot - Kafka 消费者 Bean 范围的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆