You should take a look at this Matt Might article .
It gives you a simple implementation of Producer , Consumer , Transducer (the Pipe in the haskell library that you mentioned) and an example of how to use them to create a web server.
Basically, each Producer extends Runnable and has a separate buffer for displaying items. The buffer is a java ArrayBlockingQueue that is thread safe.
Each Consumer also Runnable and has an input buffer using a similar architecture.
When you bind Consumer to Producer , you create another Runnable . At startup, it will launch Producer and Consumer (they are Runnable) and transfer data between them.
When you bind a Transducer to a Producer , it creates a new Producer .
So, if you implement it, you can write in the haskell style:
listen ==> connect ==> process ==> reply
Here is the code copied and improved from the link above:
import java.util.concurrent.ArrayBlockingQueue trait Coroutine extends Runnable { def start() { val myThread = new Thread(this) myThread.start() } } trait Producer[O] extends Coroutine { private val outputs = new ArrayBlockingQueue[O](1024) protected def put(output: O): Unit = outputs.put(output) def next(): O = outputs.take() def ==>[I >: O](consumer: Consumer[I]): Coroutine = { val that = this new Coroutine { def run() { while (true) { val o = that.next(); consumer.accept(o) } } override def start() { that.start() consumer.start() super.start() } } } } trait Consumer[I] extends Coroutine { private val inputs = new ArrayBlockingQueue[I] (1024) def accept(input : I): Unit = inputs.put(input) protected def get(): I = inputs.take() }
And here is how you could use it:
case class IntProducer(zero: Int) extends Producer[Int]{ def run(): Unit = { var i = zero while(true) { put(i); i += 1 } } } object Printer extends Consumer[Any]{ def run(): Unit = { while(true) { println(get()) } } } val pip = IntProducer(0) ==> Printer pip.start()
To see more examples and how to handle the "Converter", see my Gist .