How to read a message in netty in another class

I want to read a message at a specific position in a class other than InboundHandler . I cannot find a way to read it in the channelRead0 method, which is called from the netty grid.

For instance:

 context.writeMessage("message"); String msg = context.readMessage; 

If this is not possible, how can I match the result that I get in the channelRead0 method for a specific call that I made in another class?

+1
source share
1 answer

Netty's structure is for asynchronous management. Using this analogy, it can handle a large number of connections with minimal thread usage. I am creating an api that uses the netty grid to send calls to a remote location, you should use the same analogy for your calls.

Instead of having your api return a direct value, make it return Future<?> Or Promise<?> . There are various ways to implement this system in your application, the easiest way is to create a custom handler that displays incoming requests in Promise in a FIFO queue.

An example would be the following:

This is largely based on this answer that I presented in the past.

Let's start with a handler that displays request requests in our pipeline:

 public class MyLastHandler extends SimpleInboundHandler<String> { private final SynchronousQueue<Promise<String>> queue; public MyLastHandler (SynchronousQueue<Promise<String>> queue) { super(); this.queue = queue; } // The following is called messageReceived(ChannelHandlerContext, String) in 5.0. @Override public void channelRead0(ChannelHandlerContext ctx, String msg) { this.queue.remove().setSuccss(msg); // Or setFailure(Throwable) } } 

Then we need to have a way to send commands to the remote server:

 Channel channel = ....; SynchronousQueue<Promise<String>> queue = ....; public Future<String> sendCommandAsync(String command) { return sendCommandAsync(command, new DefaultPromise<>()); } public Future<String> sendCommandAsync(String command, Promise<String> promise) { synchronized(channel) { queue.offer(promise); channel.write(command); } channel.flush(); } 

After we have completed our methods, we need a way to call it:

 sendCommandAsync("USER anonymous", new DefaultPromise<>().addListener( (Future<String> f) -> { String response = f.get(); if (response.startWidth("331")) { // do something } // etc } ) ); 

If the caller wants to use our api as a blocking call, he can also do this:

 String response = sendCommandAsync("USER anonymous").get(); if (response.startWidth("331")) { // do something } // etc 

Note that Future.get() may throw an InterruptedException if the Thread state is interrupted, unlike a socket read operation that can only be canceled by some interaction on the socket. This exception should not be a problem in FutureListener .

+2
source

Source: https://habr.com/ru/post/1273743/


All Articles