Java应用程序:顺序工作流模式 [英] Java application: Sequence workflow pattern

查看:285
本文介绍了Java应用程序:顺序工作流模式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个spring web应用程序。当用户调用保存端点时,系统应执行许多外部调用以将状态保存在多个微服务中。但是,这些步骤相互依赖。换句话说,我有一系列要执行的步骤。 序列模式

I have a spring web application. When a user calls save endpoint, system should execute many external calls to save the state in multiple microservices. However, those steps depend on each other. In other words I have a sequence of steps to perform. sequence pattern

只是逐个调用一组步骤并不是什么大问题,我可以为每一步创建类,并逐步调用它们,在步骤之间进行适当的修改。

Just calling a set of steps one by one is not a big deal, I can just create class for each step and call them one by one doing appropriate modifications between the steps.

但是,每个步骤都可能失败,如果发生,应该正确报告给用户。以下是直接解决方案的伪代码:

However, each of the steps can fail and if it happens it should be properly reported to the user. Here is a pseudo-code for a straight forward solution:

var response = new Response()
try {
    var result1 = step1.execute(args1)
    var args2 = process(result1, args1)
    var result2 = step2.execute(args2)
    ...
catch(Step1Exception e) {
    response.setIsPartialSuccess(true);
    response.setPartialResults(e.getDetails())
}
catch(Step2Exception e) {
    response.setIsPartialSuccess(true);
    response.setPartialResults(e.getDetails())
}
return response; 

每个步骤都可以处理项目列表。有些步骤会立即发送所有项目(要么全部失败,要么都没有),有些步骤将逐个发送(一半可能失败,一半可以通过)。 StepException将包含该信息,即传递的内容,失败的内容。

Each step can process list of items. Some steps will send all items at once (either they all fail, or none), some steps will send them one by one (half can fail, half can pass). StepException will contain that information, i.e. what passed, what failed.

正如您所看到的,它不是真正可维护的。使用Spring Batch在这里会有点过分,因为我不是在阅读和写东西,我不需要任何多线程,工作细节或检查点。但是,这个想法非常相似,我想创建一些构建块并控制流程。

As you can see, it's not really maintainable. Using Spring Batch would be overkill here, because I am not reading and writing something, I don't need any multithreading, job details, or checkpoints. However, the idea is quite similar, I want to create some building blocks and control the flow.

目前我正在试图弄清楚Spring Reactor是否可以在这里提供帮助(是的,我知道这是出于不同的目的),因为它有流/管道,有一些错误处理。想象一下,我可以这样写:

At the moment I am trying to figure out if Spring Reactor can help here (yes I know it's for different purposes) since it has stream/pipe with some error handling. Imagine I could write something like:

var context = new Context(response, args1);
Mono.just(context)
    .map(step1::execute)
    .onErrorReturn(e -> context.withError(e))
    //I assume if error happened before
    //steps below are not executed
    .map(step2::execute) 
    .onErrorReturn(e -> context.withError(e))
    .block()
 return context;




您可以将被动应用程序处理的数据视为通过流水线。反应器既是传送带又是工作站。原材料从源(原始发布者)注入,最终作为成品准备推送给消费者(或订阅者)。

You can think of data processed by a reactive application as moving through an assembly line. Reactor is both the conveyor belt and the workstations. The raw material pours from a source (the original Publisher) and ends up as a finished product ready to be pushed to the consumer (or Subscriber).

原材料可以经过各种转换和其他中间步骤,也可以成为将中间件聚集在一起的更大装配线的一部分。如果在某一点出现毛刺或堵塞(也许装箱产品需要不成比例的长时间),受影响的工作站可以向上游发出信号以限制原材料的流动。

The raw material can go through various transformations and other intermediary steps or be part of a larger assembly line that aggregates intermediate pieces together. If there is a glitch or clogging at one point (perhaps boxing the products takes a disproportionately long time), the afflicted workstation can signal upstream to limit the flow of raw material.

换句话说,我正在寻找一个类似于上面的框架。我现在不需要任何异步处理或重试,但它们将来可能会有用。请告诉我是否有比我更有需要的东西。

In other words I am searching for a framework which is similar to the above. I don't need any async processing or retries now, but they may be useful in the future. Please, let me know if there is something better than reactor for my needs.

推荐答案

即使你不需要非现在阻止异步调用,Reactor仍然可以很好地适应这种情况,因为它擅长编排那种处理管道。我认为Java 8 Stream 也适合这个法案,但在这方面效果不那么强。

Even though you don't need non-blocking asynchronous calls right now, Reactor can still be a good fit for this because it is good at orchestrating that sort of processing pipeline. I'd argue Java 8 Stream could also fit the bill but is a little less powerful in that regard.

为了清晰起见,扩展方法参考,并且我的一些猜测,你的代码在Reactor中看起来像这样:

Expanding method references for clarity, and with a bit of guesswork on my part, your code would look like something like this in Reactor:

var response = Mono.just(initialArgs)
    .flatMap(args1 -> Mono.fromCallable(() -> step1.execute(args1))
        .map(result1 -> process(result1, args1) //args1 still in scope inside flatMap
    )
    .flatMap(args2 -> Mono.fromCallable(() -> step2.execute(args2))
    //alternatively to last flatMap, with caveat:
    //.map(args2 -> step2.execute(args2))
    .map(endResult -> new Response(endResult))
    .onErrorResume(error -> {
        Response errorResponse = new Response();
        errorResponse.setIsPartialSuccess(true);
        errorResponse.setPartialResults(error.getDetails());
        return Mono.just(errorResponse);
    })
    .block();

此特定链中使用的运算符不会更改线程,因此这将在线程中执行调用最后一个 block()方法。

Operators used in this particular chain don't change threads, so this would all execute in the thread from which the last block() method is called.

任何步骤的错误都会停止整个处理并传播到结束( block()然后抛出异常)。

Errors from any step stop the whole processing and are propagated to the end (block() would then throw the exception).

注意一些运营商(主要是那些运营商)一个时间的概念)改变线程,此时 stepX.execute 被阻塞成为一个问题因为这将阻止应该由整个Reactor代码共享的线程(不仅是特定的处理管道)而且资源有限。

Note that some operators (mostly those that have a notion of time) change threads, and at this point stepX.execute being blocking becomes a problem because that would block threads that are supposed to be shared by the whole of Reactor code (not only a particular processing pipeline) and are limited resources.

这篇关于Java应用程序:顺序工作流模式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆