知道Akka演员何时完成 [英] Knowing when akka actors are finished

查看:65
本文介绍了知道Akka演员何时完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有一些人和我一起致力于一个项目,他们试图找出解决这个问题的最佳方法。看来这应该是经常需要的标准事物,但是由于某些原因,我们似乎无法获得正确的答案。

There are a few people working on a project along with me that have been trying to figure out the best way to deal with this issue. It seems this should be a standard thing wanted regularly, but for some reason we can't seem to get the right answer.

如果我有一些工作要做,并且我在路由器上扔了一堆消息,怎么知道什么时候完成所有工作?例如,如果我们正在读取一百万行文件的行并将该行发送给演员以进行处理,而您需要处理下一个文件,但是必须等待第一个文件完成,那么如何知道它何时完成了吗?

If I have some work to be done and I throw a bunch of messages at a router, how can I tell when all the work is done? For example, if we're reading lines of a 1 million line file and sending the line off to actors to process this, and you need to process the next file, but must wait for the first to complete, how can you know when it is complete?

另外一条评论。我知道并已将Await.result()和Await.ready()与Patters.ask()一起使用。一个区别是,每条线都有一个期货,而我们将有大量等待这些期货的数组,而不仅仅是一个。此外,我们正在填充一个占用大量内存的大型域模型,并且不希望添加额外的内存以在等待组成的内存中保存相等数量的期货,而使用actor时每个参与者在完成工作后都会完成而不保存内存

One further comment. I'm aware and have used Await.result() and Await.ready() used with Patters.ask(). One difference is, each line would have a Future and we'd have a HUGE array of these futures to wait on, not just one. Additionally, we are populating a large domain model taking up considerable memory, and do not wish to add additional memory for holding an equal number of futures in memory waiting to be composed, while using actors each one completes after doing it's work not holding memory waiting to be composed.

我们使用的是Java,而不是Scala。

We're using Java and not Scala.

伪代码:

for(File file : files) {
    ...
    while((String line = getNextLine(fileStream)) != null) {
        router.tell(line, this.getSelf());
    }
    // we need to wait for this work to finish to do the next
    // file because it's dependent on the previous work
}

似乎您经常想做很多工作,并且知道演员什么时候结束。

It would seem you'd often want to do a lot of work and know when it's finished with actors.

推荐答案

我相信我有一个适合您的解决方案,它不涉及累积整个 Future s。首先,高层次的概念。将有两名演员参与此流程。首先,我们将其称为 FilesProcessor 。这个演员将短暂而有状态。每当您要顺序处理一堆文件时,都可以启动该actor的实例,并向其传递一条消息,其中包含要处理的文件的名称(或路径)。完成所有文件的处理后,它会自行停止。第二个参与者我们称为 LineProcessor 。这个角色是无状态的,长期存在并聚集在路由器后面。它处理文件行,然后响应请求该行处理的任何人,告诉他们已完成该行的处理。现在进入代码。

I believe I have a solution for you and it does not involve accumulating a whole bunch of Futures. First, the high level concept. There will be two actors participating in this flow. The first we'll call FilesProcessor. This actor will be short lived and stateful. Whenever you want to process a bunch of files sequentially, you spin up an instance of this actor and pass it a message containing the names (or paths) of the files you want to process. When it has completed processing of all of the files, it stops itself. The second actor we will call LineProcessor. This actor is stateless, long lived and pooled behind a router. It processes a file line and then responds back to whoever requested the line processing telling them it has completed processing that line. Now onto the code.

首先发送消息:

public class Messages {

  public static class ProcessFiles{
    public final List<String> fileNames;
    public ProcessFiles(List<String> fileNames){
      this.fileNames = fileNames;
    }
  }

  public static class ProcessLine{
    public final String line;
    public ProcessLine(String line){
      this.line = line;
    }
  }

  public static class LineProcessed{}

  public static LineProcessed LINE_PROCESSED = new LineProcessed();
}

FilesProcessor

public class FilesProcessor extends UntypedActor{
  private List<String> files;
  private int awaitingCount;
  private ActorRef router;

  @Override
  public void onReceive(Object msg) throws Exception {
    if (msg instanceof ProcessFiles){      
      ProcessFiles pf = (ProcessFiles)msg;
      router = ... //lookup router;
      files = pf.fileNames;
      processNextFile();
    }
    else if (msg instanceof LineProcessed){
      awaitingCount--;
      if (awaitingCount <= 0){
        processNextFile();
      }
    }

  }

  private void processNextFile(){
    if (files.isEmpty()) getContext().stop(getSelf());
    else{            
      String file = files.remove(0);
      BufferedReader in = openFile(file);
      String input = null;
      awaitingCount = 0;

      try{
        while((input = in.readLine()) != null){
          router.tell(new Messages.ProcessLine(input), getSelf());
          awaitingCount++;
        }        
      }
      catch(IOException e){
        e.printStackTrace();
        getContext().stop(getSelf());
      }

    }
  }

  private BufferedReader openFile(String name){
    //do whetever to load file 
    ...
  }

}

c $ c> LineProcessor :

And the LineProcessor:

public class LineProcessor extends UntypedActor{

  @Override
  public void onReceive(Object msg) throws Exception {
    if (msg instanceof ProcessLine){
      ProcessLine pl = (ProcessLine)msg;

      //Do whatever line processing...

      getSender().tell(Messages.LINE_PROCESSED, getSelf());
    }
  }

}

现在线路处理器正在发回没有其他内容的响应。如果您需要根据行的处理将某些内容发送回去,则可以肯定地更改此设置。我确定这段代码不是防弹的,我只是想向您展示一个高级概念,说明如何在没有请求/响应语义和 Future s的情况下完成此流程。

Now the line processor is sending a response back with no additional content. You could certainly change this if you needed to send something back based on the processing of the line. I'm sure this code is not bullet proof, I just wanted to show you a high level concept for how you could accomplish this flow without request/response semantics and Futures.

如果您对此方法有任何疑问或需要更多详细信息,请告诉我,我很乐意提供。

If you have any questions on this approach or want more detail, let me know and I'd be happy to provide it.

这篇关于知道Akka演员何时完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆