Implementing coroutines in Java

This question is related to my question about existing coroutine implementations in Java . If, as I suspect, it turns out that currently in Java there is no full implementation of the coroutines that will be required to implement them?

As I said in this question, I know the following:

  • You can implement “coroutines” as threads / stream pools backstage.
  • You can do complex things with the JVM bytecode behind the scenes to make coroutines possible.
  • The JVM implementation of "Da Vinci Machine" implements primitives that make coroutines executable without bytecode.
  • There are also various JNI approaches to coroutines.

In turn, I will consider each flaw.

topic-based coroutines

This "decision" is pathological. The whole point of coroutines is to avoid the overhead of threads, locking, kernel scheduling, etc. Coroutines should be lightweight and fast and should only run in user space. Introducing them in terms of full tilt flows with tight restrictions eliminates all the benefits.

JVM Byte Code Processing

This solution is more practical, although a little difficult to remove. This is about the same as switching to the assembly language for coroutine libraries in C (how many of them work) with the advantage that you only have one architecture left to worry about and get right.

It also only links you to running your code on fully compatible JVM stacks (which means, for example, without Android) if you cannot find a way to do the same in an inappropriate stack. However, if you found a way to do this, you have now doubled your complexity and system requirements.

Da Vinci Machine

Da Vinci is cool for experimenting, but since it is not a standard JVM, its features will not be available everywhere. Indeed, I suspect that most production conditions specifically prohibit the use of the Da Vinci machine. That way, I could use this to do cool experiments, but not for any code that I expect to release in the real world.

This also has an additional problem, similar to the JVM bytecode management solution described above: will not work on alternative stacks (e.g. Android).

JNI implementation

This solution does its best for this in Java. Each combination of processor and operating system requires independent testing, and each of them is potentially frustrating to a weak failure. As an alternative, of course, I could tie myself to one platform as a whole, but this also makes things in Java completely controversial.

So...

Is there a way to implement coroutines in Java without using one of these four methods? Or will I be forced to use one of those four that smells the least (JVM manipulation)?




Edited to add:

Just to make sure that the confusion is contained, this related question is my different , but not the same. This user is looking for an existing existing implementation to avoid having to reinvent the wheel. This is a question related to how coroutines in Java could be implemented if another proves irrefutable. The goal is to ask different questions on different topics.

+57
java coroutine kotlin-coroutines
May 17 '10 at 4:26
source share
7 answers

I would look at this: http://www.chiark.greenend.org.uk/~sgtatham/coroutines.html , its quite interesting and should be a good place to start. But, of course, we use Java so that we can do better (or maybe worse, because there are no macros))

From my understanding with coroutines, you usually use the producer and consumer coroutines (or at least this is the most common pattern). But semantically, you do not want the producer to call the consumer, or vice versa, because it introduces an asymmetry. But, given how stack-based languages ​​work, we need someone to make a call.

So here is a very simple type hierarchy:

public interface CoroutineProducer<T> { public T Produce(); public boolean isDone(); } public interface CoroutineConsumer<T> { public void Consume(T t); } public class CoroutineManager { public static Execute<T>(CoroutineProducer<T> prod, CoroutineConsumer<T> con) { while(!prod.IsDone()) // really simple { T d = prod.Produce(); con.Consume(d); } } } 

Now, of course, the difficult part implements the interfaces, in particular, it is difficult to break the calculation into separate stages. To do this, you probably need a whole set of permanent management structures . The basic idea is that we want to simulate a non-local transfer of control (in the end, it looks like we're modeling goto ). We basically want to abandon the use of the stack and pc (program-counter), saving the state of our current operations on the heap, and not on the stack. Therefore, we need a bunch of helper classes.

For example:

Let's say that in an ideal world you wanted to write a consumer similar to this one (psuedocode):

 boolean is_done; int other_state; while(!is_done) { //read input //parse input //yield input to coroutine //update is_done and other_state; } 

we need to abstract the local variable like is_done and other_state , and we need to abstract the while loop because our yield operation will not use the stack. So let's create an abstraction of the while loop and related classes:

 enum WhileState {BREAK, CONTINUE, YIELD} abstract class WhileLoop<T> { private boolean is_done; public boolean isDone() { return is_done;} private T rval; public T getReturnValue() {return rval;} protected void setReturnValue(T val) { rval = val; } public T loop() { while(true) { WhileState state = execute(); if(state == WhileState.YIELD) return getReturnValue(); else if(state == WhileState.BREAK) { is_done = true; return null; } } } protected abstract WhileState execute(); } 

The main trick here is to move local variables into class variables and turn the blocks of regions into classes, which allows us to “re-enter” our loop after receiving our return value.

Now, to realize our manufacturer

 public class SampleProducer : CoroutineProducer<Object> { private WhileLoop<Object> loop;//our control structures become state!! public SampleProducer() { loop = new WhileLoop() { private int other_state;//our local variables become state of the control structure protected WhileState execute() { //this implements a single iteration of the loop if(is_done) return WhileState.BREAK; //read input //parse input Object calcluated_value = ...; //update is_done, figure out if we want to continue setReturnValue(calculated_value); return WhileState.YIELD; } }; } public Object Produce() { Object val = loop.loop(); return val; } public boolean isDone() { //we are done when the loop has exited return loop.isDone(); } } 

Similar tricks can be performed for other basic flow control structures. Ideally, you should create a library of these helper classes, and then use them to implement these simple interfaces, which will ultimately provide you with the semantics of shared routines. I am sure that everything that I wrote here can be generalized and significantly expanded.

+31
May 17 '10 at 6:02 a.m.
source share

I would advise taking a look at the Kotlin coroutines on the JVM . It falls into a different category, though. There is no bytecode manipulation here, and it works on Android as well. However, you will have to write your coroutines in Kotlin. On the plus side, Kotlin is designed to provide Java-style interoperability, so you can still continue to use all your Java libraries and freely combine Kotlin and Java code in the same project, even putting them side by side in the same directories and packages.

There are many more examples in this kotlinx.coroutines manual , while the coroutine development document explains all the motives, use cases, and implementation details.

+8
Apr 05 '17 at 10:10
source share

I just stumbled upon this question and just want to mention that I think it is possible to implement coroutines or generators in the same way as C # does. However, I do not use Java, but CIL has the same limitations as the JVM.

the output operator in C # is a pure language attribute and is not part of the CIL byte code. The C # compiler simply creates a hidden private class for each generator function. If you use the yield statement in a function, it should return an IEnumerator or IEnumerable. The compiler "packs" your code into the statemachine class.

The C # compiler may use some "goto's" in the generated code to simplify the conversion to statemachine. I don’t know the capabilities of Java bytecode and if there is something like a simple unconditional jump, but at the “assembly” level this is usually possible.

As already mentioned, this function must be implemented in the compiler. Since I have little knowledge of Java and the compiler, I can’t say whether the compiler can be modified / extended, perhaps with the help of a “preprocessor” or something like that.

Personally, I love coroutines. As a Unity game developer, I use them quite often. Since I play Minecraft a lot with ComputerCraft, I was curious why coroutines in Lua (LuaJ) are implemented with threads.

+3
Apr 20 '13 at 12:13
source share

Kotlin uses the following approach for joint procedures
(from https://kotlinlang.org/docs/reference/coroutines.html ):

Coroutines are fully implemented using the compilation method (support from the VM or OS is not required), and suspension works through code conversion. In principle, each suspension function (optimization can be applied, but we will not stop here) is converted to a state machine, where the states correspond to suspend calls. Before pausing, the next state is saved in the field of the class generated by the compiler along with the corresponding local variables, etc. When resuming this coroutine, local variables are restored and the state machine goes from state immediately after the suspension.

A supported coroutine can be saved and transferred as an object that retains its suspended state and local residents. The type of such objects is Continuation, and the general code transformation described here corresponds to the classical style of continuing the transition. Therefore, the suspension functions accept an additional parameter of type Continuation under the hood.

Check out the project document https://github.com/Kotlin/kotlin-coroutines/blob/master/kotlin-coroutines-informal.md

+1
Apr 6 '17 at 4:08 on
source share

I have a Coroutine class that I use in Java. It is based on threads and uses threads, has the advantage of allowing parallel operation, which on multi-core machines can be an advantage. So you can consider a thread based approach.

0
Sep 04 '12 at 10:33
source share

Here is another option for Java6 +

Running pythonic coroutine:

 import java.lang.ref.WeakReference; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; class CorRunRAII { private final List<WeakReference<? extends CorRun>> resources = new ArrayList<>(); public CorRunRAII add(CorRun resource) { if (resource == null) { return this; } resources.add(new WeakReference<>(resource)); return this; } public CorRunRAII addAll(List<? extends CorRun> arrayList) { if (arrayList == null) { return this; } for (CorRun corRun : arrayList) { add(corRun); } return this; } @Override protected void finalize() throws Throwable { super.finalize(); for (WeakReference<? extends CorRun> corRunWeakReference : resources) { CorRun corRun = corRunWeakReference.get(); if (corRun != null) { corRun.stop(); } } } } class CorRunYieldReturn<ReceiveType, YieldReturnType> { public final AtomicReference<ReceiveType> receiveValue; public final LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue; CorRunYieldReturn(AtomicReference<ReceiveType> receiveValue, LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue) { this.receiveValue = receiveValue; this.yieldReturnValue = yieldReturnValue; } } interface CorRun<ReceiveType, YieldReturnType> extends Runnable, Callable<YieldReturnType> { boolean start(); void stop(); void stop(final Throwable throwable); boolean isStarted(); boolean isEnded(); Throwable getError(); ReceiveType getReceiveValue(); void setResultForOuter(YieldReturnType resultForOuter); YieldReturnType getResultForOuter(); YieldReturnType receive(ReceiveType value); ReceiveType yield(); ReceiveType yield(YieldReturnType value); <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another); <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another, final TargetReceiveType value); } abstract class CorRunSync<ReceiveType, YieldReturnType> implements CorRun<ReceiveType, YieldReturnType> { private ReceiveType receiveValue; public final List<WeakReference<CorRun>> potentialChildrenCoroutineList = new ArrayList<>(); // Outside private AtomicBoolean isStarted = new AtomicBoolean(false); private AtomicBoolean isEnded = new AtomicBoolean(false); private Throwable error; private YieldReturnType resultForOuter; @Override public boolean start() { boolean isStarted = this.isStarted.getAndSet(true); if ((! isStarted) && (! isEnded())) { receive(null); } return isStarted; } @Override public void stop() { stop(null); } @Override public void stop(Throwable throwable) { isEnded.set(true); if (throwable != null) { error = throwable; } for (WeakReference<CorRun> weakReference : potentialChildrenCoroutineList) { CorRun child = weakReference.get(); if (child != null) { child.stop(); } } } @Override public boolean isStarted() { return isStarted.get(); } @Override public boolean isEnded() { return isEnded.get(); } @Override public Throwable getError() { return error; } @Override public ReceiveType getReceiveValue() { return receiveValue; } @Override public void setResultForOuter(YieldReturnType resultForOuter) { this.resultForOuter = resultForOuter; } @Override public YieldReturnType getResultForOuter() { return resultForOuter; } @Override public synchronized YieldReturnType receive(ReceiveType value) { receiveValue = value; run(); return getResultForOuter(); } @Override public ReceiveType yield() { return yield(null); } @Override public ReceiveType yield(YieldReturnType value) { resultForOuter = value; return receiveValue; } @Override public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(CorRun<TargetReceiveType, TargetYieldReturnType> another) { return yieldFrom(another, null); } @Override public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(CorRun<TargetReceiveType, TargetYieldReturnType> another, TargetReceiveType value) { if (another == null || another.isEnded()) { throw new RuntimeException("Call null or isEnded coroutine"); } potentialChildrenCoroutineList.add(new WeakReference<CorRun>(another)); synchronized (another) { boolean isStarted = another.start(); boolean isJustStarting = ! isStarted; if (isJustStarting && another instanceof CorRunSync) { return another.getResultForOuter(); } return another.receive(value); } } @Override public void run() { try { this.call(); } catch (Exception e) { e.printStackTrace(); stop(e); return; } } } abstract class CorRunThread<ReceiveType, YieldReturnType> implements CorRun<ReceiveType, YieldReturnType> { private final ExecutorService childExecutorService = newExecutorService(); private ExecutorService executingOnExecutorService; private static final CorRunYieldReturn DUMMY_COR_RUN_YIELD_RETURN = new CorRunYieldReturn(new AtomicReference<>(null), new LinkedBlockingDeque<AtomicReference>()); private final CorRun<ReceiveType, YieldReturnType> self; public final List<WeakReference<CorRun>> potentialChildrenCoroutineList; private CorRunYieldReturn<ReceiveType, YieldReturnType> lastCorRunYieldReturn; private final LinkedBlockingDeque<CorRunYieldReturn<ReceiveType, YieldReturnType>> receiveQueue; // Outside private AtomicBoolean isStarted = new AtomicBoolean(false); private AtomicBoolean isEnded = new AtomicBoolean(false); private Future<YieldReturnType> future; private Throwable error; private final AtomicReference<YieldReturnType> resultForOuter = new AtomicReference<>(); CorRunThread() { executingOnExecutorService = childExecutorService; receiveQueue = new LinkedBlockingDeque<>(); potentialChildrenCoroutineList = new ArrayList<>(); self = this; } @Override public void run() { try { self.call(); } catch (Exception e) { stop(e); return; } stop(); } @Override public abstract YieldReturnType call(); @Override public boolean start() { return start(childExecutorService); } protected boolean start(ExecutorService executorService) { boolean isStarted = this.isStarted.getAndSet(true); if (!isStarted) { executingOnExecutorService = executorService; future = (Future<YieldReturnType>) executingOnExecutorService.submit((Runnable) self); } return isStarted; } @Override public void stop() { stop(null); } @Override public void stop(final Throwable throwable) { if (throwable != null) { error = throwable; } isEnded.set(true); returnYieldValue(null); // Do this for making sure the coroutine has checked isEnd() after getting a dummy value receiveQueue.offer(DUMMY_COR_RUN_YIELD_RETURN); for (WeakReference<CorRun> weakReference : potentialChildrenCoroutineList) { CorRun child = weakReference.get(); if (child != null) { if (child instanceof CorRunThread) { ((CorRunThread)child).tryStop(childExecutorService); } } } childExecutorService.shutdownNow(); } protected void tryStop(ExecutorService executorService) { if (this.executingOnExecutorService == executorService) { stop(); } } @Override public boolean isEnded() { return isEnded.get() || ( future != null && (future.isCancelled() || future.isDone()) ); } @Override public boolean isStarted() { return isStarted.get(); } public Future<YieldReturnType> getFuture() { return future; } @Override public Throwable getError() { return error; } @Override public void setResultForOuter(YieldReturnType resultForOuter) { this.resultForOuter.set(resultForOuter); } @Override public YieldReturnType getResultForOuter() { return this.resultForOuter.get(); } @Override public YieldReturnType receive(ReceiveType value) { LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue = new LinkedBlockingDeque<>(); offerReceiveValue(value, yieldReturnValue); try { AtomicReference<YieldReturnType> takeValue = yieldReturnValue.take(); return takeValue == null ? null : takeValue.get(); } catch (InterruptedException e) { e.printStackTrace(); } return null; } @Override public ReceiveType yield() { return yield(null); } @Override public ReceiveType yield(final YieldReturnType value) { returnYieldValue(value); return getReceiveValue(); } @Override public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another) { return yieldFrom(another, null); } @Override public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another, final TargetReceiveType value) { if (another == null || another.isEnded()) { throw new RuntimeException("Call null or isEnded coroutine"); } boolean isStarted = false; potentialChildrenCoroutineList.add(new WeakReference<CorRun>(another)); synchronized (another) { if (another instanceof CorRunThread) { isStarted = ((CorRunThread)another).start(childExecutorService); } else { isStarted = another.start(); } boolean isJustStarting = ! isStarted; if (isJustStarting && another instanceof CorRunSync) { return another.getResultForOuter(); } TargetYieldReturnType send = another.receive(value); return send; } } @Override public ReceiveType getReceiveValue() { setLastCorRunYieldReturn(takeLastCorRunYieldReturn()); return lastCorRunYieldReturn.receiveValue.get(); } protected void returnYieldValue(final YieldReturnType value) { CorRunYieldReturn<ReceiveType, YieldReturnType> corRunYieldReturn = lastCorRunYieldReturn; if (corRunYieldReturn != null) { corRunYieldReturn.yieldReturnValue.offer(new AtomicReference<>(value)); } } protected void offerReceiveValue(final ReceiveType value, LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue) { receiveQueue.offer(new CorRunYieldReturn(new AtomicReference<>(value), yieldReturnValue)); } protected CorRunYieldReturn<ReceiveType, YieldReturnType> takeLastCorRunYieldReturn() { try { return receiveQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } return null; } protected void setLastCorRunYieldReturn(CorRunYieldReturn<ReceiveType,YieldReturnType> lastCorRunYieldReturn) { this.lastCorRunYieldReturn = lastCorRunYieldReturn; } protected ExecutorService newExecutorService() { return Executors.newCachedThreadPool(getThreadFactory()); } protected ThreadFactory getThreadFactory() { return new ThreadFactory() { @Override public Thread newThread(final Runnable runnable) { Thread thread = new Thread(runnable); thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread thread, Throwable throwable) { throwable.printStackTrace(); if (runnable instanceof CorRun) { CorRun self = (CorRun) runnable; self.stop(throwable); thread.interrupt(); } } }); return thread; } }; } } 

Now you can use pythonic coroutines this way (e.g. fibonacci number)

Topic Version:

 class Fib extends CorRunThread<Integer, Integer> { @Override public Integer call() { Integer times = getReceiveValue(); do { int a = 1, b = 1; for (int i = 0; times != null && i < times; i++) { int temp = a + b; a = b; b = temp; } // A pythonic "yield", ie, it returns `a` to the caller and waits `times` value from the next caller times = yield(a); } while (! isEnded()); setResultForOuter(Integer.MAX_VALUE); return getResultForOuter(); } } class MainRun extends CorRunThread<String, String> { @Override public String call() { // The fib coroutine would be recycled by its parent // (no requirement to call its start() and stop() manually) // Otherwise, if you want to share its instance and start/stop it manually, // please start it before being called by yieldFrom() and stop it in the end. Fib fib = new Fib(); String result = ""; Integer current; int times = 10; for (int i = 0; i < times; i++) { // A pythonic "yield from", ie, it calls fib with `i` parameter and waits for returned value as `current` current = yieldFrom(fib, i); if (fib.getError() != null) { throw new RuntimeException(fib.getError()); } if (current == null) { continue; } if (i > 0) { result += ","; } result += current; } setResultForOuter(result); return result; } } 

Synchronization (non-streaming) version:

 class Fib extends CorRunSync<Integer, Integer> { @Override public Integer call() { Integer times = getReceiveValue(); int a = 1, b = 1; for (int i = 0; times != null && i < times; i++) { int temp = a + b; a = b; b = temp; } yield(a); return getResultForOuter(); } } class MainRun extends CorRunSync<String, String> { @Override public String call() { CorRun<Integer, Integer> fib = null; try { fib = new Fib(); } catch (Exception e) { e.printStackTrace(); } String result = ""; Integer current; int times = 10; for (int i = 0; i < times; i++) { current = yieldFrom(fib, i); if (fib.getError() != null) { throw new RuntimeException(fib.getError()); } if (current == null) { continue; } if (i > 0) { result += ","; } result += current; } stop(); setResultForOuter(result); if (Utils.isEmpty(result)) { throw new RuntimeException("Error"); } return result; } } 



Execution (both versions will work):

 // Run the entry coroutine MainRun mainRun = new MainRun(); mainRun.start(); // Wait for mainRun ending for 5 seconds long startTimestamp = System.currentTimeMillis(); while(!mainRun.isEnded()) { if (System.currentTimeMillis() - startTimestamp > TimeUnit.SECONDS.toMillis(5)) { throw new RuntimeException("Wait too much time"); } } // The result should be "1,1,2,3,5,8,13,21,34,55" System.out.println(mainRun.getResultForOuter()); 
0
Nov 24 '17 at 8:47
source share

There is also Quasar for Java and Project Loom in Oracle, where extensions for the JVM for fibers and extensions are made. Here is the Loom presentation on Youtoube. There are a few more. Easy to find with little search.

0
Jun 11 '19 at 14:00
source share



All Articles