Development of a universal interface for asynchronous asynchronous control

I am considering a generic universal / revocable interface for asynchronous requests / responses. The requirements are as follows: he must:

  • Asynchronous Call Support
  • Cancel
  • Be shared
  • Support Request / Response
  • Support for either returning in the current thread or processing the response in another response

So here is my first hit:

interface AsyncOperation<INPUT, OUTPUT> {
    Future<OUTPUT> execute(INPUT in, AsyncCallback<OUTPUT> callback);
}

interface AsyncCallback<OUTPUT> {
    void done(OUTPUT output);
}

Using:

// completely async operation
operation.execute("Test", new AsyncCallback<String> {
    public void done(String output) {
        // process result...
    }
});

// sync operation with cancellation after timeout
Future<String> future = operation.execute("Test", null);
try {
    String result = future.get(1000);
} catch(TimeoutException ex) {
    future.cancel();
}

disadvantages

  • It's complicated.
  • It supports only one query parameter - not too worried about this
  • The only "ready" means that the exceptions must be passed through the "made" ones, this can be solved by having onSuccess and onException (and onFinally?) In AsyncCallback, but this will make it even more detailed

Google Protocol :

void [methodname](RpcController controller, 
    [RequestClass] request, RpcCallback<[ResponseClass]> callback);

?

+3
3

INPUT? , , :

void greet(final String name) {
    new AsyncOperation<Object>() {
        @Override Object doIt() {
            System.out.println("Hello " + name + "!");
        }
    }.execute(null);
}

, , , .

, . , ? , , - .

+1

jetlang. -. :

@Test
public void simpleRequestResponse() throws InterruptedException {
    Fiber req = new ThreadFiber();
    Fiber reply = new ThreadFiber();
    req.start();
    reply.start();
    RequestChannel<String, Integer> channel = new MemoryRequestChannel<String, Integer>();
    Callback<Request<String, Integer>> onReq = new Callback<Request<String, Integer>>() {
        public void onMessage(Request<String, Integer> message) {
            assertEquals("hello", message.getRequest());
            message.reply(1);
        }
    };
    channel.subscribe(reply, onReq);

    final CountDownLatch done = new CountDownLatch(1);
    Callback<Integer> onReply = new Callback<Integer>() {
        public void onMessage(Integer message) {
            assertEquals(1, message.intValue());
            done.countDown();
        }
    };
    AsyncRequest.withOneReply(req, channel, "hello", onReply);
    assertTrue(done.await(10, TimeUnit.SECONDS));
    req.dispose();
    reply.dispose();
}
0

Well, sucks to answer my own question, but I came up with a set of classes that work well for this:

// the main Async operation interface
public interface AsyncOperation<OUTPUT, INPUT> {
    public AsyncController execute(INPUT input, 
        AsyncCallback<OUTPUT> callback);
}

// the callback that gets called when the operation completes
public interface AsyncCallback<OUTPUT> {
    public void done(AsyncResult<OUTPUT> output);
}

// provides the ability to cancel operations
public interface AsyncController {
    void cancel();
}

// this provides a convenient way to manage a response that is either a
// value or an exception
public class AsyncResult<VALUE> {

    private final VALUE value;
    private final Throwable throwable;

    private AsyncResult(VALUE value, Throwable throwable) {
        this.value = value;
        this.throwable = throwable;
    }

    public AsyncResult(VALUE value) {
        this(value, null);
    }

    public AsyncResult(Throwable throwable) {
        this((VALUE) null, throwable);
    }

    public VALUE value() throws Throwable {
        if(throwable != null) {
            throw throwable;
        } else {
            return value;
        }
    }
}
0
source

Source: https://habr.com/ru/post/1764396/


All Articles