`foreach` over a parallel set never starts

I have a Mongo database with jobs that I would like to process in parallel; I thought experimenting with concurrent collections to handle threads transparently for me (not that it would be much harder to use a thread pool). I came up with this code:

def run(stopSignal: SynchronizedQueue[Any]) = {
  val queue = new Iterator[Job] {
    private var prevId = new ObjectId("000000000000000000000000")

    def hasNext = stopSignal.isEmpty

    @tailrec
    def next = {
      val job = Job
        .where(_.status eqs Pending)
        // this works because the IDs start with a timestamp part
        .where(_._id gt prevId)
        .orderAsc(_.regTime)
        .get()
      job match {
        case Some(job) =>
          prevId = job.id
          println(s"next() => ${job.id}")
          job
        case None if hasNext =>
          Thread.sleep(500) // TODO: use a tailable cursor instead
          next
        case None =>
          throw new InterruptedException
      }
    }
  }

  try {
    queue.toStream.par.foreach { job =>
      println(s"processing ${job.id}...")
      processOne(job)
      println(s"processing complete: ${job.id}")
    }
  } catch { case _: InterruptedException => }
}

this gives:

next() => 53335f7bef867e6f0805abdb
next() => 53335fc6ef867e6f0805abe2
next() => 53335ffcef867e6f0805abe6
next() => 53336005ef867e6f0805abe7
next() => 53336008ef867e6f0805abe8
next() => 5333600cef867e6f0805abe9

but processing never begins; that is, the function passed to foreachis never called. If I delete the call .par, it works fine (but of course, of course).

What abstraction is going on for sure? How can I get around this? Or will I just give up using parallel collections for this and move on to a more efficient approach for thread pool?

+4
2

par ParSeq. , queue.toStream.par. ( hasNext , ). , , processJob

scala> (1 to 100).iterator.toStream
res7: scala.collection.immutable.Stream[Int] = Stream(1, ?)

scala> (1 to 100).iterator.toStream.par
res8: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)

par

, ( , ):

+3

; @jilen , , , :

BlockingExecutor ( Scala) http://www.javacodegeeks.com/2013/11/throttling-task-submission-with-a-blockingexecutor-2.html ( Java Concurrency ), Scala/ :

// 2 processing; 2 in the queue; 4 total
val executor = new BlockingExecutor(poolSize = 2, queueSize = 2)

try {
  queue.foreach { job =>
    executor.submit(new Runnable {
      def run = {
        println(s"processing ${job.id}...")
        processOne(job)
        println(s"processing complete: ${job.id}")
      }
    })
  }
} catch { case _: InterruptedException => }
executor.shutdown
0

Source: https://habr.com/ru/post/1533979/


All Articles