Fan / fan exit channel in close

I create objects by consuming from several sokloknykh procedures and casting back to resultChannel. The manufacturer closes its channel after the last item.

The code never ends as resultChannel never closes. How to detect and correctly complete an iteration so hasNext() return false ?

 val inputData = (0..99).map { "Input$it" } val threads = 10 val bundleProducer = produce<String>(CommonPool, threads) { inputData.forEach { item -> send(item) println("Producing: $item") } println("Producing finished") close() } val resultChannel = Channel<String>(threads) repeat(threads) { launch(CommonPool) { bundleProducer.consumeEach { println("CONSUMING $it") resultChannel.send("Result ($it)") } } } val iterator = object : Iterator<String> { val iterator = resultChannel.iterator() override fun hasNext() = runBlocking { iterator.hasNext() } override fun next() = runBlocking { iterator.next() } }.asSequence() println("Starting interation...") val result = iterator.toList() println("finish: ${result.size}") 
+5
source share
1 answer

You can run a coroutine that is waiting for completion, and then close resultChannel .

First, rewrite the code that runs consumers to save Job s:

 val jobs = (1..threads).map { launch(CommonPool) { bundleProducer.consumeEach { println("CONSUMING $it") resultChannel.send("Result ($it)") } } } 

Then run another coroutine that closes the channel after all Job complete:

 launch(CommonPool) { jobs.forEach { it.join() } resultChannel.close() } 
+3
source

Source: https://habr.com/ru/post/1270383/


All Articles