Scala: group iteration into Iterable Iterables using predicate

I have very large iterators that I want to split into parts. I have a predicate that looks at an element and returns true if this is the beginning of a new part. I need the parts to be iterators, because even the pieces do not fit in memory. There are so many things that I will be wary of a recursive solution blowing your stack. The situation is similar to this question , but I need Iterators instead of lists, and "sentries" (elements for which the predicate is true) occur (and should be included) at the beginning of the fragment. The resulting iterators will only be used in order, although some of them may not be used at all, and they should only use O (1) memory. I assume this means that they all must have the same base iterator. Performance is important.

If I were to strike at the function signature, it would be like this:

def groupby[T](iter: Iterator[T])(startsGroup: T => Boolean): Iterator[Iterator[T]] = ... 

I would love to use takeWhile , but it loses the last element. I researched a span , but it buffers the results. My best idea is to include a BufferedIterator , but there may be a better way.

You will find out that you succeeded because something like this does not break your JVM:

 groupby((1 to Int.MaxValue).iterator)(_ % (Int.MaxValue / 2) == 0).foreach(group => println(group.sum)) groupby((1 to Int.MaxValue).iterator)(_ % 10 == 0).foreach(group => println(group.sum)) 
+6
source share
5 answers

Here my solution uses BufferedIterator . This does not allow you to skip the iterators correctly, but it is quite simple and functional. The first element falls into the group, even if !startsGroup(first) .

 def groupby[T](iter: Iterator[T])(startsGroup: T => Boolean): Iterator[Iterator[T]] = new Iterator[Iterator[T]] { val base = iter.buffered override def hasNext = base.hasNext override def next() = Iterator(base.next()) ++ new Iterator[T] { override def hasNext = base.hasNext && !startsGroup(base.head) override def next() = if (hasNext) base.next() else Iterator.empty.next() } } 

Update. Saving a small state allows you to skip iterators and prevent people from messing with the previous ones:

 def groupby[T](iter: Iterator[T])(startsGroup: T => Boolean): Iterator[Iterator[T]] = new Iterator[Iterator[T]] { val base = iter.buffered var prev: Iterator[T] = Iterator.empty override def hasNext = base.hasNext override def next() = { while (prev.hasNext) prev.next() // Exhaust previous iterator; take* and drop* do NOT always work!! (Jira SI-5002?) prev = Iterator(base.next()) ++ new Iterator[T] { var hasMore = true override def hasNext = { hasMore = hasMore && base.hasNext && !startsGroup(base.head) ; hasMore } override def next() = if (hasNext) base.next() else Iterator.empty.next() } prev } } 
+5
source

You have an inherent problem. Iterable implies that you can get multiple iterators. Iterator implies that you can only go through once. This means that your Iterable[Iterable[T]] should be able to create Iterator[Iterable[T]] s. But when it returns an element - a Iterable[T] - and which requests several iterators, the basic single iterator cannot execute without caching the list results (too large) or invoking the original iteration and going through absolutely everything again (very inefficient).

So, although you can do this, I think you should understand your problem differently.

If you could start with Seq instead, you could capture the subsets as ranges.

If you already know how you want to use your iterative option, you can write a method

 def process[T](source: Iterable[T])(starts: T => Boolean)(handlers: T => Unit *) 

which each time increases the number of handlers when starts disables "true". If in any way you can do your processing in one pass, then it will be so. (However, your handlers will need to maintain state through mutable data structures or variables.)

If you can allow iterations in the external list to split the internal list, you can have Iterable[Iterator[T]] with the additional restriction that after repeated iteration to the later subterator, all previous subterators are invalid.


Here is the solution of the latter type (from Iterator[T] to Iterator[Iterator[T]] ; it can be wrapped to make the outer layers Iterable instead).

 class GroupedBy[T](source: Iterator[T])(starts: T => Boolean) extends Iterator[Iterator[T]] { private val underlying = source private var saved: T = _ private var cached = false private var starting = false private def cacheNext() { saved = underlying.next starting = starts(saved) cached = true } private def oops() { throw new java.util.NoSuchElementException("empty iterator") } // Comment the next line if you do NOT want the first element to always start a group if (underlying.hasNext) { cacheNext(); starting = true } def hasNext = { while (!(cached && starting) && underlying.hasNext) cacheNext() cached && starting } def next = { if (!(cached && starting) && !hasNext) oops() starting = false new Iterator[T] { var presumablyMore = true def hasNext = { if (!cached && !starting && underlying.hasNext && presumablyMore) cacheNext() presumablyMore = cached && !starting presumablyMore } def next = { if (presumablyMore && (cached || hasNext)) { cached = false saved } else oops() } } } } 
+5
source

If you look at memory limitations, then the following will work. You can use it only if your base iterative object supports views. This implementation will iterate over Iterable and then generate IterableViews, which can then be repeated. This implementation does not care if the very first element is checked as a start group, since it will be independent.

 def groupby[T](iter: Iterable[T])(startsGroup: T => Boolean): Iterable[Iterable[T]] = new Iterable[Iterable[T]] { def iterator = new Iterator[Iterable[T]] { val i = iter.iterator var index = 0 var nextView: IterableView[T, Iterable[T]] = getNextView() private def getNextView() = { val start = index var hitStartGroup = false while ( i.hasNext && ! hitStartGroup ) { val next = i.next() index += 1 hitStartGroup = ( index > 1 && startsGroup( next ) ) } if ( hitStartGroup ) { if ( start == 0 ) iter.view( start, index - 1 ) else iter.view( start - 1, index - 1 ) } else { // hit end if ( start == index ) null else if ( start == 0 ) iter.view( start, index ) else iter.view( start - 1, index ) } } def hasNext = nextView != null def next() = { if ( nextView != null ) { val next = nextView nextView = getNextView() next } else null } } } 
+1
source

You can maintain small memory with Streams. Use result.toIterator if you are an iterator again.

With threads, there is no volatile state, only one condition, and it is almost as brief as Jay Hacker's solution.

  def batchBy[A,B](iter: Iterator[A])(f: A => B): Stream[(B, Iterator[A])] = { val base = iter.buffered val empty = Stream.empty[(B, Iterator[A])] def getBatch(key: B) = { Iterator(base.next()) ++ new Iterator[A] { def hasNext: Boolean = base.hasNext && (f(base.head) == key) def next(): A = base.next() } } def next(skipList: Option[Iterator[A]] = None): Stream[(B, Iterator[A])] = { skipList.foreach{_.foreach{_=>}} if (base.isEmpty) empty else { val key = f(base.head) val batch = getBatch(key) Stream.cons((key, batch), next(Some(batch))) } } next() } 

I conducted tests:

 scala> batchBy((1 to Int.MaxValue).iterator)(_ % (Int.MaxValue / 2) == 0) .foreach{case(_,group) => println(group.sum)} -1610612735 1073741823 -536870909 2147483646 2147483647 

The second test prints too much to insert into Stack Overflow.

+1
source
 import scala.collection.mutable.ArrayBuffer object GroupingIterator { /** * Create a new GroupingIterator with a grouping predicate. * * @param it The original iterator * @param p Predicate controlling the grouping * @tparam A Type of elements iterated * @return A new GroupingIterator */ def apply[A](it: Iterator[A])(p: (A, IndexedSeq[A]) => Boolean): GroupingIterator[A] = new GroupingIterator(it)(p) } /** * Group elements in sequences of contiguous elements that satisfy a predicate. The predicate * tests each single potential next element of the group with the help of the elements grouped so far. * If it returns true, the potential next element is added to the group, otherwise * a new group is started with the potential next element as first element * * @param self The original iterator * @param p Predicate controlling the grouping * @tparam A Type of elements iterated */ class GroupingIterator[+A](self: Iterator[A])(p: (A, IndexedSeq[A]) => Boolean) extends Iterator[IndexedSeq[A]] { private[this] val source = self.buffered private[this] val buffer: ArrayBuffer[A] = ArrayBuffer() def hasNext: Boolean = source.hasNext def next(): IndexedSeq[A] = { if (hasNext) nextGroup() else Iterator.empty.next() } private[this] def nextGroup(): IndexedSeq[A] = { assert(source.hasNext) buffer.clear() buffer += source.next while (source.hasNext && p(source.head, buffer)) { buffer += source.next } buffer.toIndexedSeq } } 
0
source

Source: https://habr.com/ru/post/902146/


All Articles