Knowing when ack actors are finished

There are several people who are working on a project with me who have been trying to find the best way to deal with this problem. It seems that this should be a standard thing that needs to be done regularly, but for some reason we cannot get the right answer.

If I have some work, and I throw a bunch of messages on the router, how can I say when all the work will be done? For example, if we read lines from a 1 million line file and send a line to the actors to process this file, and you need to process the next file, but it should wait for the first to finish, how can you know when it is finished?

One more remark. I know and used Await.result () and Await.ready () used with Patters.ask (). One difference is that each row will have a future, and we will have a huge array of these futures to wait, not just one. In addition, we are filling out a large domain model that takes up significant memory, and we do not want to add additional memory to store an equal number of futures in memory waiting to be compiled when using actors, each of which ends after it works, without holding the memory waiting to be drawn up.

We use Java, not Scala.

Pseudocode:

for(File file : files) { ... while((String line = getNextLine(fileStream)) != null) { router.tell(line, this.getSelf()); } // we need to wait for this work to finish to do the next // file because it dependent on the previous work } 

It would seem that you often want to work hard and know when you are done with the actors.

+4
source share
2 answers

I believe that I have a solution for you, and this is not connected with the accumulation of a whole group of Future s. Firstly, the concept of a high level. Two participants are participating in this thread. First we will call FilesProcessor . This actor will be short-lived and wealthy. Whenever you want to process multiple files sequentially, you create an instance of this actor and send it a message containing the names (or paths) of the files you want to process. When he has finished processing all the files, he stops. The second actor will be called LineProcessor . This stateless actor lived for a long time and united behind a router. It processes the line of the file and then responds to the one who requested the processing of the line, saying that it has finished processing this line. Now on to the code.

Posts first:

 public class Messages { public static class ProcessFiles{ public final List<String> fileNames; public ProcessFiles(List<String> fileNames){ this.fileNames = fileNames; } } public static class ProcessLine{ public final String line; public ProcessLine(String line){ this.line = line; } } public static class LineProcessed{} public static LineProcessed LINE_PROCESSED = new LineProcessed(); } 

And FilesProcessor :

 public class FilesProcessor extends UntypedActor{ private List<String> files; private int awaitingCount; private ActorRef router; @Override public void onReceive(Object msg) throws Exception { if (msg instanceof ProcessFiles){ ProcessFiles pf = (ProcessFiles)msg; router = ... //lookup router; files = pf.fileNames; processNextFile(); } else if (msg instanceof LineProcessed){ awaitingCount--; if (awaitingCount <= 0){ processNextFile(); } } } private void processNextFile(){ if (files.isEmpty()) getContext().stop(getSelf()); else{ String file = files.remove(0); BufferedReader in = openFile(file); String input = null; awaitingCount = 0; try{ while((input = in.readLine()) != null){ router.tell(new Messages.ProcessLine(input), getSelf()); awaitingCount++; } } catch(IOException e){ e.printStackTrace(); getContext().stop(getSelf()); } } } private BufferedReader openFile(String name){ //do whetever to load file ... } } 

And LineProcessor :

 public class LineProcessor extends UntypedActor{ @Override public void onReceive(Object msg) throws Exception { if (msg instanceof ProcessLine){ ProcessLine pl = (ProcessLine)msg; //Do whatever line processing... getSender().tell(Messages.LINE_PROCESSED, getSelf()); } } } 

The line processor now sends the response back without additional content. You could, of course, change this if you needed to send something back based on line processing. I'm sure this code is not bulletproof, I just wanted to show you a high-level concept on how you can execute this thread without request / response semantics and Future s.

If you have any questions about this approach or would like more information, let me know and I would be happy to provide it.

+4
source

Use context.setRecieveTimeout on routes to send a message back to the sender with the number of messages processed. When all processed messages == the amount sent will be completed.

If your routes remain busy enough so that setReceiveTimeout doesn't work often enough, then plan your own messages to send samples.

0
source

Source: https://habr.com/ru/post/1491041/


All Articles