Axon 4:从不同线程应用事件时未触发 EventSourcingHandler [英] Axon 4: EventSourcingHandler not triggered when applying event from a different thread

查看:84
本文介绍了Axon 4:从不同线程应用事件时未触发 EventSourcingHandler的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 Axon 4 中的命令处理中遇到了一个小问题.

I've encountered a little issue with command handling in Axon 4.

假设我有一个在处理命令时需要调用外部服务的聚合.

Let say I have an aggregate that need to call an external service when handling a command.

外部服务使用异步客户端(vertx tcp 客户端 + rxjava),因此响应在与创建聚合实例的线程不同的线程中给出.

The external service uses an asynchronous client (vertx tcp client + rxjava), so the response is given in a different thread than the one that created the aggregate instance.

我想根据我的服务结果应用一个事件,但它不起作用,因为 AggregateLifecycle.apply() 调用在不同的线程上...

I want to apply an event given the result of my service, but it does not work because the AggregateLifecycle.apply() call is on a different thread...

如何转移"聚合的范围?

How can I "transfert" the scope of the aggregate ?

这是我想做的一个小例子(使用 rxjava 2 和 lombok):

Here is a little exemple of what I want to do (uses rxjava 2 and lombok):

聚合:

@Slf4j
@Aggregate
@NoArgsConstructor
public class MyAggregate {

    @AggregateIdentifier
    private String id;

    @CommandHandler
    public MyAggregate(CreationCommand creationCommand) {
        Single.just("some data")
                .observeOn(Schedulers.computation()) // <- comment this line and the test pass, uncomment and it fail because apply is on another thread ?
                .subscribe((s, throwable) -> apply(new AggregateCreatedEvent(creationCommand.getId())));
    }

    @EventSourcingHandler
    public void on(AggregateCreatedEvent event) {
        this.id = event.getId();
    }
}

@Value class CreationCommand { String id; }
@Value class AggregateCreatedEvent { String id;}

测试:

public class MyAggregateTest {

    AggregateTestFixture<MyAggregate> testFixture = new AggregateTestFixture<>(MyAggregate.class);

    @Test
    public void test() {
        testFixture.givenNoPriorActivity()
                .when(new CreationCommand("123"))
                .expectEvents(new AggregateCreatedEvent("123"));
    }
}

这是我遇到的错误:

java.lang.IllegalStateException: Cannot request current Scope if none is active

推荐答案

必须在管理该工作单元的线程中应用该事件,在本例中为 CommandHandler.Axon 为异步操作提供了自己的机制.commandBus 异步接受命令,事件由事件处理器异步处理.异步实现 CommandHandler 也没有任何好处,无论如何,目前不支持.

The event must be applied in the thread that manages that unit of work, in this case the CommandHandler. Axon provides its own mechanisms for asynchronous operations. The commandBus accepts commands asynchronously, and events are processed by the event processors asynchronously. There's nothing to be gained from also implementing your CommandHandler asynchronously, and in any case, it's not supported at this time.

应用事件所需的所有数据通常应在命令或聚合状态中可用,而不是来自其他外部来源.

All the data that you need to apply the event should normally be available in the command or in the aggregate state, not from additional external sources.

这可能是您希望命令处理程序的样子:

This is probably What you want your command handler to look like:

@CommandHandler
public MyAggregate(CreationCommand creationCommand) {
    apply(new AggregateCreatedEvent(creationCommand.getId());
}

这篇关于Axon 4:从不同线程应用事件时未触发 EventSourcingHandler的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆