My application has a list publisherPostListenerListthat receives real-time user messages from the RabbitMQ queue for sending to subscribers / consumers. A list is a property of a class ApplicationListenerthat listens for pubsub queue events. The following controller method retrieves list items using the getter method and, based on logic, pushes messages to subscribers.
The stream is as follows
The user writes the message → The message falls into DB + Queue → The message from the queue is added to the list, which publisherPostListenerListwill be redirected to the user's subscribers.
As you can see, it publisherPostListenerListis a common list for n simultaneous requests, since it ApplicationListeneris single-point. For one instance, the setup works fine, but will not work in a clustered environment, since each node will have its own list publisherPostListenerList.
How can I handle this situation? I can't make ApplicationListenerclass stateless. I need a list to store message items received from the queue. Am I putting a list in a distributed cache cache? Or is there some other ordinary way?
ApplicationListener.java
@Component
public class ApplicationEventListener {
private List<Post> publisherPostListenerList = new CopyOnWriteArrayList<Post>();
private static final Logger logger = Logger.getLogger(ApplicationEventListener.class);
@EventListener
public void postSubmissionEventHandler(PostSubmissionEvent event) throws IOException {
Post post = event.getPost();
logger.debug("application published user post received " + post);
publisherPostListenerList.add(post);
}
public List<Post> getPublisherPostListenerList() {
return publisherPostListenerList;
}
public void setPublisherPostListenerList(List<Post> publisherPostListenerList) {
this.publisherPostListenerList = publisherPostListenerList;
}
}
Controller method for pushing a message to a subscriber
@RequestMapping(value="/getRealTimeServerPushUserPosts")
public SseEmitter getRealTimeServerPushUserPosts(@RequestParam("userId") int userId){
SseEmitter sseEmitter = new SseEmitter();
CustomUserDetail myUserDetails = currentUserAccessor.getCurrentLoggedInUser();
User loggedInUser=myUserDetails.getUser();
List<Integer> userPublisherIDList = this.userService.loadUserPublisherIdListWhichLoggedInUserFollows(loggedInUser);
List<Post> postList =eventListener.getPublisherPostListenerList();
for(Integer userPublisherId : userPublisherIDList){
for(Post post:postList){
if((userPublisherId.intValue()) == (post.getUser().getUserId().intValue())){
try {
sseEmitter.send(post);
postList.remove(post);
} catch (IOException e) {
logger.error(e);
}
}
}
}
return sseEmitter;
}
source
share