CompletableFuture是否具有相应的Local上下文? [英] Does CompletableFuture have a corresponding Local context?

查看:184
本文介绍了CompletableFuture是否具有相应的Local上下文?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在过去,我们有ThreadLocal的程序可以将数据与请求路径一起传送,因为所有请求处理都是在该线程上完成的,诸如Logback之类的东西在MDC.put("requestId", getNewRequestId());

In the olden days, we had ThreadLocal for programs to carry data along with the request path since all request processing was done on that thread and stuff like Logback used this with MDC.put("requestId", getNewRequestId());

然后出现了Scala和函数式编程,接着出现了Future,随之而来的还有Local.scala(至少我知道twitter Future的twitter具有此类). Future.scala知道Local.scala并通过所有map/flatMap等功能传递上下文,以便我仍然可以执行Local.set("requestId", getNewRequestId());,然后在它经过许多线程之后向下游传输,我可以仍然使用Local.get(...)

Then Scala and functional programming came along and Futures came along and with them came Local.scala (at least I know the twitter Futures have this class). Future.scala knows about Local.scala and transfers the context through all the map/flatMap, etc. etc. functionality such that I can still do Local.set("requestId", getNewRequestId()); and then downstream after it has travelled over many threads, I can still access it with Local.get(...)

Soooo,我的问题是在Java中,我可以在带有LocalContext或某个对象(不确定名称)的某个地方对新的CompletableFuture做同样的事情,这样,我可以将Logback MDC上下文修改为将其存储在该上下文中,而不是存储在ThreadLocal中,这样我就不会丢失请求ID,并且我在thenApplythenAccept等上的所有日志都可以正常工作,并且可以记录日志和-XrequestId登录配置中标记.

Soooo, my question is in Java, can I do the same thing with the new CompletableFuture somewhere with LocalContext or some object (not sure of the name) and in this way, I can modify Logback MDC context to store it in that context instead of a ThreadLocal such that I don't lose the request id and all my logs across the thenApply, thenAccept, etc. etc. still work just fine with logging and the -XrequestId flag in Logback configuration.

作为一个例子.如果有请求进入,并且正在使用Log4j或Logback,则在过滤器中,将设置MDC.put("requestId", requestId),然后在您的应用中,您将在以下行记录许多日志语句:

As an example. If you have a request come in and you are using Log4j or Logback, in a filter, you will set MDC.put("requestId", requestId) and then in your app, you will log many log statements line this:

log.info("request came in for url="+url);
log.info("request is complete");

现在,在日志输出中将显示以下内容:

Now, in the log output it will show this:

INFO {time}: requestId425 request came in for url=/mypath
INFO {time}: requestId425 request is complete

这是使用ThreadLocal的技巧来实现的.在Twitter上,我们在Scala中使用Scala和Twitter Future以及Local.scala类. Local.scalaFuture.scala被捆绑在一起,因为我们仍然可以实现上述情况,这仍然非常好,我们的所有log语句都可以记录请求ID,因此开发人员不必记住要记录请求ID并可以进行跟踪单个客户请求具有该ID的响应周期.

This is using a trick of ThreadLocal to achieve this. At Twitter, we use Scala and Twitter Futures in Scala along with a Local.scala class. Local.scala and Future.scala are tied together in that we can achieve the above scenario still which is very nice and all our log statements can log the request id so the developer never has to remember to log the request id and you can trace through a single customers request response cycle with that id.

我在Java中没有看到它:(这很不幸,因为有很多用例.也许我没有看到某些东西?

I don't see this in Java :( which is very unfortunate as there are many use cases for that. Perhaps there is something I am not seeing though?

推荐答案

我的解决方案主题是(它将与JDK 9+一起使用,因为从该版本开始公开了几个可重写的方法)

My solution theme would be to (It would work with JDK 9+ as a couple of overridable methods are exposed since that version)

让完整的生态系统了解MDC

Make the complete ecosystem aware of MDC

为此,我们需要解决以下情况:

And for that, we need to address the following scenarios:

  • 何时我们都可以从此类中获得CompletableFuture的新实例?→我们需要返回一个MDC感知的相同版本.
  • 什么时候都能从此类之外获得CompletableFuture的新实例?→我们需要返回一个MDC感知的相同版本.
  • 在CompletableFuture类中使用哪个执行程序?→在所有情况下,我们需要确保所有执行程序都知道MDC
  • When all do we get new instances of CompletableFuture from within this class? → We need to return a MDC aware version of the same rather.
  • When all do we get new instances of CompletableFuture from outside this class? → We need to return a MDC aware version of the same rather.
  • Which executor is used when in CompletableFuture class? → In all circumstances, we need to make sure that all executors are MDC aware

为此,让我们通过扩展它来创建CompletableFuture的MDC感知版本类.我的版本如下所示

For that, let's create a MDC aware version class of CompletableFuture by extending it. My version of that would look like below

import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.function.Supplier;

public class MDCAwareCompletableFuture<T> extends CompletableFuture<T> {

    public static final ExecutorService MDC_AWARE_ASYNC_POOL = new MDCAwareForkJoinPool();

    @Override
    public CompletableFuture newIncompleteFuture() {
        return new MDCAwareCompletableFuture();
    }

    @Override
    public Executor defaultExecutor() {
        return MDC_AWARE_ASYNC_POOL;
    }

    public static <T> CompletionStage<T> getMDCAwareCompletionStage(CompletableFuture<T> future) {
        return new MDCAwareCompletableFuture<>()
                .completeAsync(() -> null)
                .thenCombineAsync(future, (aVoid, value) -> value);
    }

    public static <T> CompletionStage<T> getMDCHandledCompletionStage(CompletableFuture<T> future,
                                                                Function<Throwable, T> throwableFunction) {
        Map<String, String> contextMap = MDC.getCopyOfContextMap();
        return getMDCAwareCompletionStage(future)
                .handle((value, throwable) -> {
                    setMDCContext(contextMap);
                    if (throwable != null) {
                        return throwableFunction.apply(throwable);
                    }
                    return value;
                });
    }
}

MDCAwareForkJoinPool类的样子(为简单起见,已跳过了使用ForkJoinTask参数的方法)

The MDCAwareForkJoinPool class would look like (have skipped the methods with ForkJoinTask parameters for simplicity)

public class MDCAwareForkJoinPool extends ForkJoinPool {
    //Override constructors which you need

    @Override
    public <T> ForkJoinTask<T> submit(Callable<T> task) {
        return super.submit(MDCUtility.wrapWithMdcContext(task));
    }

    @Override
    public <T> ForkJoinTask<T> submit(Runnable task, T result) {
        return super.submit(wrapWithMdcContext(task), result);
    }

    @Override
    public ForkJoinTask<?> submit(Runnable task) {
        return super.submit(wrapWithMdcContext(task));
    }

    @Override
    public void execute(Runnable task) {
        super.execute(wrapWithMdcContext(task));
    }
}

要包装的实用方法例如

public static <T> Callable<T> wrapWithMdcContext(Callable<T> task) {
    //save the current MDC context
    Map<String, String> contextMap = MDC.getCopyOfContextMap();
    return () -> {
        setMDCContext(contextMap);
        try {
            return task.call();
        } finally {
            // once the task is complete, clear MDC
            MDC.clear();
        }
    };
}

public static Runnable wrapWithMdcContext(Runnable task) {
    //save the current MDC context
    Map<String, String> contextMap = MDC.getCopyOfContextMap();
    return () -> {
        setMDCContext(contextMap);
        try {
            return task.run();
        } finally {
            // once the task is complete, clear MDC
            MDC.clear();
        }
    };
}

public static void setMDCContext(Map<String, String> contextMap) {
   MDC.clear();
   if (contextMap != null) {
       MDC.setContextMap(contextMap);
    }
}

以下是一些使用准则:

  • 使用类MDCAwareCompletableFuture而不是类CompletableFuture.
  • CompletableFuture中的几个方法实例化自身版本,例如new CompletableFuture....对于此类方法(大多数公共静态方法),请使用替代方法来获取MDCAwareCompletableFuture的实例.使用替代方法的示例可能不是使用CompletableFuture.supplyAsync(...),而是可以选择new MDCAwareCompletableFuture<>().completeAsync(...)
  • 当您陷入困境时,可以使用方法getMDCAwareCompletionStageCompletableFuture的实例转换为MDCAwareCompletableFuture,这是因为某些外部库返回了CompletableFuture的实例.显然,您不能在该库中保留上下文,但是在您的代码触及应用程序代码后,此方法仍将保留上下文.
  • 在提供执行程序作为参数时,请确保它是MDC Aware,例如MDCAwareForkJoinPool.您也可以通过覆盖execute方法来创建MDCAwareThreadPoolExecutor来满足您的用例.你明白了!
  • Use the class MDCAwareCompletableFuture rather than the class CompletableFuture.
  • A couple of methods in the class CompletableFuture instantiates the self version such as new CompletableFuture.... For such methods (most of the public static methods), use an alternative method to get an instance of MDCAwareCompletableFuture. An example of using an alternative could be rather than using CompletableFuture.supplyAsync(...), you can choose new MDCAwareCompletableFuture<>().completeAsync(...)
  • Convert the instance of CompletableFuture to MDCAwareCompletableFuture by using the method getMDCAwareCompletionStage when you get stuck with one because of say some external library which returns you an instance of CompletableFuture. Obviously, you can't retain the context within that library but this method would still retain the context after your code hits the application code.
  • While supplying an executor as a parameter, make sure that it is MDC Aware such as MDCAwareForkJoinPool. You could create MDCAwareThreadPoolExecutor by overriding execute method as well to serve your use case. You get the idea!

您可以在 查看全文

登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆