卡夫卡侦听器中的钩子 [英] Hooks in Kafka Listener

查看:154
本文介绍了卡夫卡侦听器中的钩子的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在kafka收听消息之前/之后,是否有任何可用的钩子?

Is there any sort of hooks available before / after kafka listen a message ?

用例: 必须设置MDC关联ID才能执行日志可追溯性

Use case : MDC co-relation id has to be set for to perform the log traceability

我正在寻找什么? 一种事前/事后回调方法,以便可以在进入时设置MDC关联ID,并在退出时最终清除MDC.

What I am looking for ? A before/after call back method so that MDC co-relation id can be set on entry and eventually clean MDC upon exit.

编辑场景: 作为Kafka标头的一部分,我正在获取关联ID,一旦在Kafka Listener中收到消息,我想在MDC中设置相同的ID.

Edited Scenario: I am getting co-relation id as a part of Kafka Headers and I want to set the same in MDC as soon as I receive a message in Kafka Listener

感谢帮助

推荐答案

您可以在侦听器bean中添加周围建议...

You can add an around advice to your listener bean...

@SpringBootApplication
public class So59854374Application {

    public static void main(String[] args) {
        SpringApplication.run(So59854374Application.class, args);
    }

    @Bean
    public static BeanPostProcessor bpp() { // static is important
        return new BeanPostProcessor() {

            @Override
            public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
                if (bean instanceof MyListener) {
                    ProxyFactoryBean pfb = new ProxyFactoryBean();
                    pfb.setTarget(bean);
                    pfb.addAdvice(new MethodInterceptor() {

                        @Override
                        public Object invoke(MethodInvocation invocation) throws Throwable {
                            try {
                                System.out.println("Before");
                                return invocation.proceed();
                            }
                            finally {
                                System.out.println("After");
                            }
                        }

                    });
                    return pfb.getObject();
                }
                return bean;
            }

        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so59854374").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> template.send("so59854374", "foo");
    }

}

@Component
class MyListener {

    @KafkaListener(id = "so59854374", topics = "so59854374")
    public void listen(String in) {
        System.out.println(in);
    }

}

Before
foo
After

编辑

如果将@Header("myMdcHeader") byte[] mdc作为附加参数添加到kafka侦听器方法中,则可以在调用上使用getArguments()[1].

If you add @Header("myMdcHeader") byte[] mdc as an additional parameter to your kafka listener method, you can use getArguments()[1] on the invocation.

另一种解决方案是将RecordInterceptor添加到侦听器容器工厂,这允许您在将原始ConsumerRecord传递给侦听器适配器之前对其进行访问.

Another solution is to add a RecordInterceptor to the listener container factory, which allows you to access the raw ConsumerRecord before it is passed to the listener adapter.

/**
 * An interceptor for {@link ConsumerRecord} invoked by the listener
 * container before invoking the listener.
 *
 * @param <K> the key type.
 * @param <V> the value type.
 *
 * @author Gary Russell
 * @since 2.2.7
 *
 */
@FunctionalInterface
public interface RecordInterceptor<K, V> {

    /**
     * Perform some action on the record or return a different one.
     * If null is returned the record will be skipped.
     * @param record the record.
     * @return the record or null.
     */
    @Nullable
    ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record);

}

/**
 * Set an interceptor to be called before calling the listener.
 * Does not apply to batch listeners.
 * @param recordInterceptor the interceptor.
 * @since 2.2.7
 */
public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
    this.recordInterceptor = recordInterceptor;
}

如果您使用的是批处理侦听器,Kafka将提供ConsumerInterceptor.

If you are using a batch listener, Kafka provides a ConsumerInterceptor.

这篇关于卡夫卡侦听器中的钩子的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆