使用 Hooks 和 Lift 将 Context 推送到 ThreadLocal [英] Using Hooks and a Lift to push Context into ThreadLocal

查看:98
本文介绍了使用 Hooks 和 Lift 将 Context 推送到 ThreadLocal的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我问自己是否有办法在订阅者收到 onNext 信号之前将反应性上下文推送到 ThreadLocal 变量中.在研究 reactor-core 时,我发现了 Hooks 类和 Lift BiFunction.

I was asking myself if there was a way to push the reactive context into a ThreadLocal variable before a subscriber received the onNext signal. While digging inside reactor-core, I've found Hooks class and Lift BiFunction.

我创建了一个具有以下实现的类.该类由一个 ThreadLocal 变量组成,该变量将保存 Context 并实现必要的 BiFunction 接口.它会将所有调用委托给实际订阅者,并且还会在调用实际订阅者上的 onNext 之前将上下文(如果修改)推送到 ThreadLocal 变量中.

I've created a class with the following implementation. The class is composed of a ThreadLocal variable that will hold the Context and implements the necessary BiFunction interface. It will delegate all the call to the actual subscriber and will also push the context if modified into the ThreadLocal variable before calling the onNext on the actual subscriber.

package com.example.demo;

import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.util.context.Context;

import java.util.function.BiFunction;

public class ThreadLocalContextLifter<T> implements BiFunction<Scannable, CoreSubscriber<? super T>, CoreSubscriber<? super T>> {

    private static Logger logger = LoggerFactory.getLogger(ThreadLocalContextLifter.class);

    private static final ThreadLocal<Context> contextHolder = new ThreadLocal<>();

    public static Context getContext() {
        Context context = contextHolder.get();
        if (context == null) {
            context = Context.empty();
            contextHolder.set(context);
        }
        return context;
    }

    public static void setContext(Context context) {
        contextHolder.set(context);
    }

    @Override
    public CoreSubscriber<? super T> apply(Scannable scannable, CoreSubscriber<? super T> coreSubscriber) {
        return new ThreadLocalContextCoreSubscriber<>(coreSubscriber);
    }

    final class ThreadLocalContextCoreSubscriber<U> implements CoreSubscriber<U> {

        private CoreSubscriber<? super U> delegate;

        public ThreadLocalContextCoreSubscriber(CoreSubscriber<? super U> delegate) {
            this.delegate = delegate;
        }

        @Override
        public Context currentContext() {
            return delegate.currentContext();
        }

        @Override
        public void onSubscribe(Subscription s) {
            delegate.onSubscribe(s);
        }

        @Override
        public void onNext(U u) {
            Context context = delegate.currentContext();
            if (!context.isEmpty()) {
                Context currentContext = ThreadLocalContextLifter.getContext();
                if (!currentContext.equals(context)) {
                    logger.info("Pushing reactive context to holder {}", context);
                    ThreadLocalContextLifter.setContext(context);
                }
            }
            delegate.onNext(u);
        }

        @Override
        public void onError(Throwable t) {
            delegate.onError(t);
        }

        @Override
        public void onComplete() {
            delegate.onComplete();
        }
    }

}

使用以下代码将实例加载到 Hooks 中:

The instance is loaded into the Hooks with the following code:

Hooks.onEachOperator(Operators.lift(new ThreadLocalContextLifter<>()));

我已经运行了一些测试,它似乎工作正常,但我不相信解决方案.我猜这个钩子会降低反应器的性能,或者它在我不知道的某些情况下不起作用.

I've run some tests and it seems to work properly but I'm not convinced by the solution. I'm guessing that the hook will degrade the performance of reactor or that it will not work in some case that I'm not aware of.

我的问题很简单:这是一个坏主意吗?

My question is simple: Is this a bad idea?

推荐答案

我不认为这个想法有什么问题...每个 Reactor 提供的操作符都使用这个钩子.

I don't think there is anything wrong with that idea... The hook is used by every Reactor-provided operator.

ContextonNext 之间不会改变,所以电梯 ThreadLocalContextCoreSubscriber 可以在 onSubscribe 中捕获它.但是您仍然需要在 onNext 中至少检查一次 ThreadLocal,因为 onNextonSubscribe 可以发生在两个不同的线程上,因此您使用 delegate.currentContext() 的解决方案也有效.最后,您的方法看起来很合理.

The Context doesn't change between onNext, so the lift ThreadLocalContextCoreSubscriber could capture it in onSubscribe. But you'd still need to check the ThreadLocal at least once in onNext, since onNext and onSubscribe can happen on two different threads, so your solution of using delegate.currentContext() works too. In the end, your approach looks sound.

这篇关于使用 Hooks 和 Lift 将 Context 推送到 ThreadLocal的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆