Modeling semantics of producer and consumer using models?

If some objects in the system can function as producers of data or events, and other objects can function as consumers, does it make sense to externalize these “orthogonal problems” in the classes of producers and consumers?

I see that the Haskell Pipes library takes this approach and evaluates this question, which may look pretty common for people coming from the Haskell background, but will be interested in Scala's perspective and examples, because I don't see much.

+5
source share
1 answer

You should take a look at this Matt Might article .

It gives you a simple implementation of Producer , Consumer , Transducer (the Pipe in the haskell library that you mentioned) and an example of how to use them to create a web server.

Basically, each Producer extends Runnable and has a separate buffer for displaying items. The buffer is a java ArrayBlockingQueue that is thread safe.

Each Consumer also Runnable and has an input buffer using a similar architecture.

When you bind Consumer to Producer , you create another Runnable . At startup, it will launch Producer and Consumer (they are Runnable) and transfer data between them.

When you bind a Transducer to a Producer , it creates a new Producer .

So, if you implement it, you can write in the haskell style:

 listen ==> connect ==> process ==> reply 

Here is the code copied and improved from the link above:

 import java.util.concurrent.ArrayBlockingQueue trait Coroutine extends Runnable { def start() { val myThread = new Thread(this) myThread.start() } } trait Producer[O] extends Coroutine { private val outputs = new ArrayBlockingQueue[O](1024) protected def put(output: O): Unit = outputs.put(output) def next(): O = outputs.take() def ==>[I >: O](consumer: Consumer[I]): Coroutine = { val that = this new Coroutine { def run() { while (true) { val o = that.next(); consumer.accept(o) } } override def start() { that.start() consumer.start() super.start() } } } } trait Consumer[I] extends Coroutine { private val inputs = new ArrayBlockingQueue[I] (1024) def accept(input : I): Unit = inputs.put(input) protected def get(): I = inputs.take() } 

And here is how you could use it:

 case class IntProducer(zero: Int) extends Producer[Int]{ def run(): Unit = { var i = zero while(true) { put(i); i += 1 } } } object Printer extends Consumer[Any]{ def run(): Unit = { while(true) { println(get()) } } } val pip = IntProducer(0) ==> Printer pip.start() 

To see more examples and how to handle the "Converter", see my Gist .

+2
source

Source: https://habr.com/ru/post/1200489/


All Articles