Akka. , . ExecutorService, , JDK, .
ExecutionServices , One ExecutionService . -, ExecutionService, , , .
, . , . Akka . , , , , , .
:
Maven Akka.
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.4.17</version>
</dependency>
Java 8. Java, IDE.
package com.softwaremosaic.demos.akka;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
public class ActorDemo {
public static void main( String[] args ) throws InterruptedException {
ExecutorService exectorService = new ActorExecutionService( job -> job.hashCode()+"" );
for ( int i=0; i<100; i++ ) {
int id = i;
exectorService.submit( () -> System.out.println("JOB " + id) );
}
exectorService.shutdown();
exectorService.awaitTermination( 1, TimeUnit.MINUTES );
System.out.println( "DONE" );
}
}
class ActorExecutionService extends AbstractExecutorService {
private final ActorSystem actorSystem;
private final Function<Runnable, String> partitioner;
private final ConcurrentHashMap<String,ActorRef> actors = new ConcurrentHashMap<>();
public ActorExecutionService( Function<Runnable,String> partitioner ) {
this.actorSystem = ActorSystem.create("demo");
this.partitioner = partitioner;
}
public void execute( Runnable command ) {
String partitionKey = partitioner.apply( command );
ActorRef actorRef = actors.computeIfAbsent( partitionKey, this::createNewActor );
actorRef.tell( command, actorRef );
}
private ActorRef createNewActor( String partitionKey ) {
return actorSystem.actorOf( Props.create(ExecutionServiceActor.class), partitionKey );
}
public void shutdown() {
actorSystem.terminate();
}
public List<Runnable> shutdownNow() {
actorSystem.terminate();
try {
awaitTermination( 1, TimeUnit.MINUTES );
} catch ( InterruptedException e ) {
throw new RuntimeException( e );
}
return Collections.emptyList();
}
public boolean isShutdown() {
return actorSystem.isTerminated();
}
public boolean isTerminated() {
return actorSystem.isTerminated();
}
public boolean awaitTermination( long timeout, TimeUnit unit ) throws InterruptedException {
actorSystem.awaitTermination();
return actorSystem.isTerminated();
}
}
class ExecutionServiceActor extends UntypedActor {
public void onReceive(Object message) throws Exception {
if (message instanceof Runnable) {
((Runnable) message).run();
} else {
unhandled(message);
}
}
}
NB. 1-100 undefined. - ( Akka ) . , . , , Akka, , .