Akka distributed by sub pub: Java implementation does not work

Subscriber core class: Application.java

package com.mynamespace;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.contrib.pattern.DistributedPubSubExtension;
import akka.contrib.pattern.DistributedPubSubMediator;

import com.mynamespace.actors.SubscriberActor;

@SpringBootApplication
@ComponentScan(basePackages = "com.mynamespace.*")   
public class Application {

    public static void main(String[] args) throws InterruptedException {

        ApplicationContext ctx = SpringApplication.run(Application.class, args);
        // get hold of the actor system
        ActorSystem system = ctx.getBean(ActorSystem.class);
        ActorRef mediator = DistributedPubSubExtension.get(system).mediator();
        ActorRef subscriber = system.actorOf(
                Props.create(SubscriberActor.class), "subscriber");
       // subscribe to the topic named "content"
        mediator.tell(new DistributedPubSubMediator.Put(subscriber), subscriber);
        // subscriber.tell("init", null);
        System.out.println("Running.");
        Thread.sleep(5000l);
    }
}

Subscriber: SubscriberActor.java

package com.mynamespace.actors;

import java.util.ArrayList;
import java.util.List;

import akka.actor.UntypedActor;

import com.mynamespace.message.CategoryServiceRequest;
import com.mynamespace.message.CategoryServiceResponse;

public class SubscriberActor extends UntypedActor {

    @Override
    public void onReceive(Object msg) throws Exception {
        if (msg instanceof CategoryServiceRequest) {
            System.out.println("Request received for GetCategories.");
            CategoryServiceResponse response = new CategoryServiceResponse();
            List<String> categories = new ArrayList<>();
            categories.add("Food");
            categories.add("Fruits");
            response.setCatgories(categories);
            getSender().tell(response, getSelf());
        } else if (msg instanceof String && msg.equals("init")) {
            System.out.println("init called");
        } else {
            System.out
                .println("Unhandelled message received for getCategories.");
        }
    }

}

Application.conf for Subscriber

akka {
    loglevel = INFO
    stdout-loglevel = INFO
    loggers = ["akka.event.slf4j.Slf4jLogger"]
    extensions = ["akka.contrib.pattern.DistributedPubSubExtension"]
    actor {
      provider = "akka.cluster.ClusterActorRefProvider"
    }

    remote {
       enabled-transports = ["akka.remote.netty.tcp"]
       netty.tcp {
         hostname = "127.0.0.1"
         port = 0
       }
     }

     cluster {
    seed-nodes = [
      "akka.tcp://mynamespace-actor-system@127.0.0.1:2551",
      "akka.tcp://mynamespace-actor-system@127.0.0.1:2552"]

    auto-down-unreachable-after = 10s
    }

}

Publisher core class: Application.java

package com.mynamespace;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ComponentScan;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.contrib.pattern.DistributedPubSubExtension;
import akka.contrib.pattern.DistributedPubSubMediator;

import com.mynamespace.actors.PublisherActor;

@SpringBootApplication
@ComponentScan(basePackages = "com.mynamespace.*")
public class Application {

    public static void main(String[] args) throws InterruptedException {

        ApplicationContext ctx = SpringApplication.run(Application.class, args);
        // get hold of the actor system
        ActorSystem system = ctx.getBean(ActorSystem.class);
        ActorRef mediator = DistributedPubSubExtension.get(system).mediator();
        ActorRef publisher = system.actorOf(Props.create(PublisherActor.class),
            "publisher");
        mediator.tell(new DistributedPubSubMediator.Put(publisher), publisher);
        Thread.sleep(5000);
        publisher.tell("hi", publisher);
        System.out.println("Running.");
    }
}

PublisherActor.java

package com.mynamespace.actors;

import scala.concurrent.Future;
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.contrib.pattern.DistributedPubSubExtension;
import akka.contrib.pattern.DistributedPubSubMediator;
import akka.dispatch.Mapper;
import akka.pattern.Patterns;
import akka.util.Timeout;

import com.mynamespace.message.CategoryServiceRequest;
import com.mynamespace.message.CategoryServiceResponse;

public class PublisherActor extends UntypedActor {

    // activate the extension
    ActorRef mediator = DistributedPubSubExtension.get(getContext().system())
        .mediator();

    public void onReceive(Object msg) {
        if (msg instanceof String) {
            Timeout timeOut = new Timeout(50000l);
            mediator.tell(new DistributedPubSubMediator.Send(
                    "/user/subscriber", new CategoryServiceRequest()),
                    getSelf());
            Future<Object> response = Patterns.ask(mediator,
                    new DistributedPubSubMediator.Send("/user/subscriber",
                            new CategoryServiceRequest()), timeOut);
            Future<CategoryServiceResponse> finalresponse = response.map(
                    new Mapper<Object, CategoryServiceResponse>() {

                        @Override
                        public CategoryServiceResponse apply(Object parameter) {
                            CategoryServiceResponse responseFromRemote = (CategoryServiceResponse) parameter;
                            System.out.println("received:: list of size:: "
                                + responseFromRemote.getCatgories().size());
                            return responseFromRemote;
                        }

                    }, getContext().system().dispatcher());
        } else if (msg instanceof DistributedPubSubMediator.SubscribeAck) {
            System.out.println("subscribbed.......");

        } else {
            unhandled(msg);
        }
    }
}

The conf application for the publisher is the same as for the subscriber. Both of them work on different ports in the same system.

I have two seed nodes defined and running on my local system. Anyway, I can’t use the ASK / TELL subscriber from the manufacturer (both work on different nodes) through the DistributedPubSub Mediator.

After running Subscriber, then the publisher: I don't get any exceptions or dead letter references printed in stdout / logs.

Is it possible to be able to see which actor links are associated with my intermediary?

.

+4
1

, @spam Publish/Subscribe sendOneMessageToEachGroup=true.

, Send ? , . , , - ( , , )

, , , , ,

0

Source: https://habr.com/ru/post/1598855/


All Articles