Java: multi-queue executing service

Requirements:

  • I have messages grouped by different types, for example Type1, Type2 ... Type100.
  • I want to execute different types of messages in parallel. Let them speak in 10 threads, but all messages of the same type must be executed one after another. The order of execution does not matter.
  • As soon as the thread completes all messages TypeX. He should start processing another type.

I looked at different answers: Most of them offer artist services for multithreading processing. Let them say that we are creating an artist service, for example

ExecutorService executorService = Executors.newFixedThreadPool(10);

but as soon as we send a message using executorService.submit(runnableMessage);

We do not gain any control over the assignment of a specific type of message for a specific flow only.

Decision:

creation of an array of single-threaded performers

ExecutorService[] pools = new ExecutorService[10];

Type1, Type2... Type10 , - , Type11 , .

?

- - , ?

+4
3

:

, . :

. Group1 type1

class MessageGroup implements Runnable {
    String type;
    String List<Message> messageList;

    @Override
    public void run() {
      for(Message message : MessageList) {
         message.process();
      }
    }
} 

- ,

ExecutorService executorService = Executors.newFixedThreadPool(10); 

,

executorService.submit(runnableGroup);

.

+1

Akka. , . ExecutorService, , JDK, .

ExecutionServices , One ExecutionService . -, ExecutionService, , , .

, . , . Akka . , , , , , .

:

Maven Akka.

    <dependency>
        <groupId>com.typesafe.akka</groupId>
        <artifactId>akka-actor_2.11</artifactId>
        <version>2.4.17</version>
    </dependency>

Java 8. Java, IDE.

package com.softwaremosaic.demos.akka;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;


public class ActorDemo {

    public static void main( String[] args ) throws InterruptedException {
        // The following partitioner will spread the requests over
        // multiple actors, which I chose to demonstrate the technique.
        // You will need to change it to one that better maps the the
        // jobs to your use case.   Remember that jobs that get mapped
        // to the same key, will get executed in serial (probably
        // but not necessarily) by the same thread.
        ExecutorService exectorService = new ActorExecutionService( job -> job.hashCode()+"" );

        for ( int i=0; i<100; i++ ) {
            int id = i;
            exectorService.submit( () -> System.out.println("JOB " + id) );
        }

        exectorService.shutdown();
        exectorService.awaitTermination( 1, TimeUnit.MINUTES );

        System.out.println( "DONE" );
    }

}


class ActorExecutionService extends AbstractExecutorService {

    private final ActorSystem                              actorSystem;
    private final Function<Runnable, String>               partitioner;
    private final ConcurrentHashMap<String,ActorRef>       actors = new ConcurrentHashMap<>();

    public ActorExecutionService( Function<Runnable,String> partitioner ) {
        this.actorSystem = ActorSystem.create("demo");
        this.partitioner = partitioner;
    }


    public void execute( Runnable command ) {
        String partitionKey = partitioner.apply( command );

        ActorRef actorRef = actors.computeIfAbsent( partitionKey, this::createNewActor );

        actorRef.tell( command, actorRef );
    }

    private ActorRef createNewActor( String partitionKey ) {
        return actorSystem.actorOf( Props.create(ExecutionServiceActor.class), partitionKey );
    }


    public void shutdown() {
        actorSystem.terminate();
    }

    public List<Runnable> shutdownNow() {
        actorSystem.terminate();

        try {
            awaitTermination( 1, TimeUnit.MINUTES );
        } catch ( InterruptedException e ) {
            throw new RuntimeException( e );
        }

        return Collections.emptyList();
    }

    public boolean isShutdown() {
        return actorSystem.isTerminated();
    }

    public boolean isTerminated() {
        return actorSystem.isTerminated();
    }

    public boolean awaitTermination( long timeout, TimeUnit unit ) throws InterruptedException {
        actorSystem.awaitTermination();

        return actorSystem.isTerminated();
    }
}

 class ExecutionServiceActor extends UntypedActor {
    public void onReceive(Object message) throws Exception {
        if (message instanceof Runnable) {
            ((Runnable) message).run();
        } else {
            unhandled(message);
        }
    }
}

NB. 1-100 undefined. - ( Akka ) . , . , , Akka, , .

+5

, . , 10 ArrayDeques, "Typ". 10 ScheduledExecutors. 5 , 200 . " X: null", .

. 200 , . , .

import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Messages {

    public static void main(String[] args) {

        Map<String, ArrayDeque<String>> messages = new HashMap<String, ArrayDeque<String>>();
        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
        long initialDelay = 5000;
        long period = 200;

        // create 10 Queues, indexed by the type
        // create 10 executor-services, focused on their message queue
        for(int i=1; i<11; i++) {
            String type = "Type" + i;

            Runnable task = () -> System.out.println(
                     "current message of " + type + ": " + messages.get(type).poll()
            );

            messages.put(type, new ArrayDeque<String>());
            service.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.MILLISECONDS);
        }

    }
}
+2

Source: https://habr.com/ru/post/1674600/


All Articles