关闭scala.concurrent.Future的onFailure继续未按预期工作 [英] Closure in onFailure continuation for a scala.concurrent.Future not working as expected

查看:102
本文介绍了关闭scala.concurrent.Future的onFailure继续未按预期工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我遇到一个有两个方法的问题,第一个方法在循环中调用第二个方法,第二个方法创建 Future 如下:

I'm encountering an issue where I have two methods, the first calling the second in a loop and the second creating a Future as follows:

public class WorkersCoordinator {
    private static Logger LOGGER = 
        LoggerFactory.getLogger(WorkersCoordinator.class);

    private final Timeout timeout;

    private final ActorSystem system;

    private final List<Class<? extends BaseWorker>> workers;

    private final Map<Class, ActorRef> refMap;

    private final WorkResultPackageQueue pkgQueue;

    private final ActorFactory actorFactory;

    @Autowired
    public WorkersCoordinator(final ApplicationConfiguration config,
                             final ActorSystem system,
                             final ActorFactory actorFactory, 
                             final WorkerFactory workerFactory,
                             final WorkResultPackageQueue pkgQueue) {
        timeout = new Timeout(config.getTimeoutInMilliseconds(), 
                              TimeUnit.MILLISECONDS);

        this.system = system;
        this.actorFactory = actorFactory;
        this.pkgQueue = pkgQueue;

        refMap = Map.newHashMap();
        workers = Lists.newArrayList(workerFactory.getAllAvailableWorkers());
    }

    public void delegateWorkers() {
        for (Class<? extends BaseWorker> worker : workers) {
            if (refMap.containsKey(worker) continue;
            sendWork(worker);
        }
    }

    private void sendWork(Class<? extends BaseWorker> worker) {
        // GetDataActor extends AbstractActor
        ActorRef actorRef = actorFactory.create(GetDataActor.class);
        Future<Object> responseRef = Patterns.ask(actorRef, worker, timeout);

        responseRef.onFailure(new OnFailure() {
            @Override
            public void onFailure(Throwable failure) throws Throwable {
                LOGGER.error("Worker {} encountered a problem - cancelling.", 
                             worker.getSimpleName());
                if (refMap.containsKey(worker)) {
                    refMap.remove(worker);
                }
            }
        }, system.dispatcher());

        responseRef.onSuccess(new OnSuccess<Object>() {
            @Override
            public void onSuccess(Object msg) throws Throwable {
                if (msg instanceof WorkResultPackage) {
                    final WorkResultPackage reportPackage = (WorkResultPackage) msg;
                    LOGGER.info(
                        "Received AssetDataPackage from {}, containing {} items",
                        reportPackage.getWorkReport().getFromWorker().getSimpleName(),
                        reportPackage.getPkg().getData().size());

                    pkgQueue.enqueue(reportPackage);
                } else {
                    LOGGER.eror(
                        "Expected to receive WorkResultPackage Object but received: {}",
                        msg.getClass());
                        throw new UnknownObjectReceived(msg);
                }
            }
        }, system.dispatcher());

        refMap.put(worker, actorRef);
    }
}

问题是我相信 responseRef.onFailure 的行为不符合我的预期。如果我与3个工作人员(称为我配置为失败的工作人员)一起调用此操作,则会处理一次故障,但是日志记录不确定哪个工作人员将被报告为发生故障,即使它应该始终是我标记为失败的工作人员。我是这个技术堆栈(Java,Scala Futures和AKKA)的新手,并且是建立代码的基础,可以在其中找到这个建立的模式,因此我不知道我是在忽略Java / Scala Future中的某些东西还是误解闭包。这里的症结在于,我需要报告哪个工作程序失败,然后将其从 refMap 中删除​​,这样就不再考虑它了。甚至更奇怪的是,即使报告错误的工作人员失败,所有工作人员似乎都已完成并从 refMap 中删除​​。

The issue is that the closure I believe responseRef.onFailure makes doesn't act as I would expect. If I call this with 3 workers, one of which I configure to fail, a failure is handled but the logging is indeterminate as to which worker will be reported as having failed even though it should be consistently the one I marked to fail. I'm new to this tech stack (Java, Scala Futures, and AKKA) and the established code base in which I find this established pattern so I don't know if I'm overlooking something or misunderstanding closures in Java/Scala Futures. The sticking point here is that I need to report which worker failed and remove it from the refMap so that it will no longer be considered in process. Even stranger is the fact that all workers appear to complete and to be removed from refMap even while the wrong worker is reported as failed.

更新:在没有运气回答为什么闭包不起作用之后,我做了一些调查,发现了另一个帖子,回答Java 8是否甚至支持闭包:

Update: After having no luck with getting an answer why the closure wasn't working, I did some investigation and found another post answering whether Java 8 even supports closures:

Java 8是否支持闭包?

简短的答案,我相信是的。但是,它提到了 final 或实际上是 final 变量。因此,我更新了我的代码,如下所示。希望这会使了解闭包的人能够帮助我理解为什么闭包不如我以前所用(在C#和JavaScript中)。我只发布对 sendWork(...)的更新,以突出显示我尝试无济于事的努力。

Short answer, I believe it does. However, it spoke of final or effectively final variables. Thus, I updated my code as follows. Hopefully, this gets folks who understand closures to help me understand why they don't work as I'm used to (in C# and JavaScript). I'm only posting updates to sendWork(...) to highlight efforts I tried to no avail.

private void sendWork(Class<? extends BaseWorker> worker) {
    // GetDataActor extends AbstractActor
    ActorRef actorRef = actorFactory.create(GetDataActor.class);
    Future<Object> responseRef = Patterns.ask(actorRef, worker, timeout);

    Consumer<Throwable> onFailureClosure = (ex) -> {
            LOGGER.error("Worker {} encountered a problem - cancelling.", 
                         worker.getSimpleName());
            if (refMap.containsKey(worker)) {
                refMap.remove(worker);
            }
    }

    responseRef.onFailure(new OnFailure() {
        @Override
        public void onFailure(Throwable failure) throws Throwable {
            onFailureClosure.accept(failure);
        }
    }, system.dispatcher());

    responseRef.onSuccess(new OnSuccess<Object>() {
        @Override
        public void onSuccess(Object msg) throws Throwable {
            if (msg instanceof WorkResultPackage) {
                final WorkResultPackage reportPackage = (WorkResultPackage) msg;
                LOGGER.info(
                    "Received AssetDataPackage from {}, containing {} items",
                    reportPackage.getWorkReport().getFromWorker().getSimpleName(),
                    reportPackage.getPkg().getData().size());

                pkgQueue.enqueue(reportPackage);
            } else {
                LOGGER.eror(
                    "Expected to receive WorkResultPackage Object but received: {}",
                    msg.getClass());
                    throw new UnknownObjectReceived(msg);
            }
        }
    }, system.dispatcher());

    refMap.put(worker, actorRef);
}


推荐答案

存在一个基本问题可能导致您所看到的行为的代码:该代码正在并发环境中对数据进行突变,而没有对该数据的任何保护措施。将来的回调可以随时执行,并且有可能并行运行。将来会有多个回调检查和更改同一数据会导致奇怪的行为。

There is a fundamental problem with the code that might be contributing to the behavior that you're seeing: the code is mutating data in a concurrent environment without any safeguards for that data. Future callbacks can be executed at any time and can potentially run in parallel. Having multiple future callbacks inspecting and mutating the same data can cause weird behavior.

Java处理可变数据并发访问的典型方法是使用同步和锁定。幸运的是,由于您使用的是Akka,因此有一种更好的方法。基本上,将 WorkersCoordinator 重构为一个参与者,并使用顺序消息处理的参与者行为来安全处理可变状态

The typical approach in Java to deal with concurrent access to mutable data is to use synchronization and locks. Fortunately, since you're using Akka, there is a better approach. Basically, refactor WorkersCoordinator to be an actor, and use the actor behavior of sequential message processing to safely handle the mutable state.

要进一步简化问题,您可以放弃在这种情况下使用,而是使用 tell 在参与者之间进行交流。我猜想这里会使用期货来捕获错误,但是处理错误的更好方法是Akka的主管策略。也就是说,如果 WorkersCoordinator 是演员,并且每个 GetDataActor 实例都是 WorkersCoordinator的子代,那么后者可以决定如何处理前者中出现的错误。例如,如果 GetDataActor 中引发异常,则协调器可以决定记录该错误,然后停止该子项。

To further simplify matters, you can forgo the use of ask in this case and instead use tell to communicate between actors. I'm guessing that futures are used here in an attempt to capture errors, but a better approach to handling errors is Akka's supervisor strategy. That is, if WorkersCoordinator were an actor and if each GetDataActor instance were a child of WorkersCoordinator, then the latter could decide how to deal with errors that arise in the former. For example, if an exception is thrown in a GetDataActor, the coordinator could decide to log the error, then stop the child.

以下是替代的 WorkersCoordinator ,其中包含上述思想:

The following is an alternative WorkersCoordinator that incorporates the above ideas:

public class WorkersCoordinator extends AbstractActor {
  private static Logger LOGGER = ...
  private final List<Class<? extends BaseWorker>> workers;
  private final Map<ActorRef, Class> refMap;
  private final WorkResultPackageQueue pkgQueue;

  public WorkersCoordinator(final WorkerFactory workerFactory,
                            final WorkResultPackageQueue pkgQueue) {
    this.pkgQueue = pkgQueue;
    this.refMap = Map.newHashMap();
    this.workers = Lists.newArrayList(workerFactory.getAllAvailableWorkers());
  }

  static Props props(WorkerFactory factory, WorkResultPackageQueue queue) {
    return Props.create(WorkersCoordinator.class, factory, queue);
  }

  static public class Delegate {}

  private static SupervisorStrategy strategy =
    new OneForOneStrategy(10, Duration.create(1, TimeUnit.MINUTES), DeciderBuilder.
      matchAny(t -> {
         ActorRef failedChild = getSender();
         LOGGER.error("This child failed: {}", failedChild);
         refMap.remove(failedChild);
         stop();
      })
      .build());

  @Override
  public SupervisorStrategy supervisorStrategy() {
    return strategy;
  }

  @Override
  public void preStart() {
    for (worker : workers) {
       ActorRef child = getContext().actorOf(Props.create(GetDataActor.class));
       refMap.put(child, worker);
    }
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
      .match(Delegate.class, d -> {
        refMap.forEach((actor, msg) -> actor.tell(msg, getSelf()));
      })
      .match(WorkResultPackage.class, p -> {
        LOGGER.info("Received AssetDataPackage from {}, containing {} items",
                    reportPackage.getWorkReport().getFromWorker().getSimpleName(),
                    reportPackage.getPkg().getData().size());

        pkgQueue.enqueue(p);
        ActorRef dataActor = getSender();
        refMap.remove(dataActor);
      })
      .matchAny(
        msg -> LOGGER.eror("Expected to receive WorkResultPackage Object but received: {}", msg)
      )
      .build();
  }
}

关于上述代码的一些说明:

Some notes about the above code:


  • 而不是使用 ActorFactory ,它似乎是一些自定义类, 道具

  • refMap 被倒置,因此 ActorRef 现在是键,并且作品的类别类型就是价值。这使得我们可以基于 ActorRef refMap 中删除​​一个条目,这两种情况都来自于子演员,并且在孩子抛出异常的情况下。

  • 为简单起见,我删除了 @Autowired 批注。在此处中可以找到有关与参与者进行依赖注入的更多信息。

  • 要创建并启动 WorkersCoordinator ,请调用其 props 方法。要启动工作,演员需要一个自定义的 Delegate 消息:演员收到此消息后,将通过 refMap 进行迭代。 ,向地图中的每个孩子发送与该孩子相关的工作单位。

  • Instead of using ActorFactory, which appears to be some custom class, Props is used instead.
  • refMap is inverted so that the ActorRef is now the key, and the class type of the work is the value. This allows us to remove an entry from refMap based on the ActorRef, both in the event of a successful response from a child actor and in the event of a child throwing an exception.
  • I removed the @Autowired annotation for simplicity. More information on dependency injection with actors is found here.
  • To create and start a WorkersCoordinator, invoke its props method. To initiate the work, the actor expects a custom Delegate message: once the actor receives this message, it iterates through refMap, sending each child in the map the unit of work that is associated with that child.

WorkerFactory factory = ...
WorkResultPackageQueue queue = ...
ActorRef coordinator = actorSystem.actorOf(WorkersCoordinator.props(factory, queue));

Delegate doWork = new Delegate();
coordinator.tell(doWork, ActorRef.noSender());

这篇关于关闭scala.concurrent.Future的onFailure继续未按预期工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆