How to cache and play <Stream <T>> provider items
As for Supplier<Stream<T>> dataSrc, I would like to cache elements Streamfor further traversals of the same sequence of elements. In this case, suppose that it dataSrcalways produces the same sequence (for example, getting a Stream<Integer>with temperatures in Celsius March (see the Usage Example below)). So option 1) is to collect the elements first Stream, however it will spend one first round to add these elements to the collection :
Supplier<Stream<T>> dataSrc = ...
List<T> cache = dataSrc.collect(toList()); // **Additional** traversal to collect items
cache.stream().reduce(…) // 1st traversal
cache.stream().reduce(…) // 2nd traversal
... // Nth traversals
I would like to avoid an additional crawl for collecting elements and an explicit variable cacheand hide it inside Supplier<>so that during the first crawl the elements are implicitly cached and during further crawls the elements get access from this cache. I think this is similar to the Reactor project cache () method for reactive flows.
Thus, I present an alternative in the following implementation of the method cache(), although it already has two problems (at least): 1) onCloseit is not called when the crawl is completed (and I can not understand the detection of the end of the crawl); 2) If the first round never ends, the cache will never be full.
Supplier<Stream<T>> dataSrc = cache(...)
dataSrc.get().reduce(…) // 1st traversal
dataSrc.get().reduce(…) // 2nd traversal
... // Nth traversals
static <T> Supplier<Stream<T>> cache(Supplier<Stream<T>> dataSrc) {
final List<T> cache = new ArrayList<>();
final AtomicBoolean started = new AtomicBoolean();
final AtomicBoolean isCached = new AtomicBoolean();
return () -> {
if(isCached.get()) return cache.stream();
if(!started.getAndSet(true)) {
return dataSrc
.get()
.peek(cache::add)
.onClose(() -> isCached.set(true));
}
return dataSrc.get();
};
}
Question:
cache(), Stream<T>, Stream () ), Stream ?
:
- . , AsyncHttpClient API URI.
Pattern pat = Pattern.compile("\\n");
boolean [] isEven = {true};
CompletableFuture<Stream<Integer>> temps = asyncHttpClient()
.prepareGet("http://api.worldweatheronline.com/premium/v1/past-weather.ashx?q=37.017,-7.933&date=2018-03-01&enddate=2018-03-31&tp=24&format=csv&key=715b185b36034a4c879141841182802")
.execute()
.toCompletableFuture()
.thenApply(Response::getResponseBody)
.thenApply(pat::splitAsStream)
.thenApply(str -> str
.filter(w -> !w.startsWith("#")) // Filter comments
.skip(1) // Skip line: Not Available
.filter(l -> isEven[0] = !isEven[0]) // Filter Even line
.map(line -> line.substring(14, 16)) // Extract temperature in celcius
.map(Integer::parseInt)
);
, a CompletableFuture<Stream<Integer>> Supplier<Stream<Integer>>. , CompletableFuture , .
1. IllegalStateException: stream has already been operated upon or closed
out.println(temps.join().distinct().count());
out.println(temps.join().max(Integer::compare)); // throws IllegalStateException
2: List , , , 3 , 2:
CompletableFuture<List<Integer>> list = temps.thenApply(str -> str.collect(toList()));
out.println(list.join().stream().distinct().count()); // 2 traversals
out.println(list.join().stream().distinct().max(Integer::compare));// 1 traversal
: . , , , .
Supplier<Stream<Integer>> cache = Cache.of(temps::join);
out.println(temps.get().distinct().count()); // 1 traversal
out.println(temps.get().max(Integer::compare)); // 1 traversal form cache
, Stream - iterator() spliterator(). , , replayable Stream ( Recorder ), Spliterator, ( cacheIterator()). getOrAdvance() Recorder synchronized, , .
, Cache.of(dataSrc) :
dataSrc ----> Recorder ----> cacheIterator() ----> Stream
:
Cache.of()parallelism.,cacheIterator()Spliterator,AbstractList.RandomAccessSpliterator.- ,
Recorder/cacheIterator(), .
. nrs (.. nrsReplay):
Random rnd = new Random();
Supplier<Stream<String>> nrs = () -> Stream.generate(() -> rnd.nextInt(99)).map(Object::toString);
IntStream.range(1, 6).forEach(size -> out.println(nrs.get().limit(size).collect(joining(","))));
System.out.println();
Supplier<Stream<String>> nrsReplay = Cache.of(nrs);
IntStream.range(1, 6).forEach(size -> out.println(nrsReplay.get().limit(size).collect(joining(","))));
:
32
65,94
94,19,34
72,77,66,18
88,41,34,97,2893
93,65
93,65,71
93,65,71,40
93,65,71,40,68
class Cache {
public static <T> Supplier<Stream<T>> of(Supplier<Stream<T>> dataSrc) {
final Spliterator<T> src = dataSrc.get().spliterator();
final Recorder<T> rec = new Recorder<>(src);
return () -> {
// CacheIterator starts on index 0 and reads data from src or
// from an internal cache of Recorder.
Spliterator<T> iter = rec.cacheIterator();
return StreamSupport.stream(iter, false);
};
}
static class Recorder<T> {
final Spliterator<T> src;
final List<T> cache = new ArrayList<>();
final long estimateSize;
boolean hasNext = true;
public Recorder(Spliterator<T> src) {
this.src = src;
this.estimateSize = src.estimateSize();
}
public synchronized boolean getOrAdvance(
final int index,
Consumer<? super T> cons) {
if (index < cache.size()) {
// If it is in cache then just get if from the corresponding index.
cons.accept(cache.get(index));
return true;
} else if (hasNext)
// If not in cache then advance the src iterator
hasNext = src.tryAdvance(item -> {
cache.add(item);
cons.accept(item);
});
return hasNext;
}
public Spliterator<T> cacheIterator() {
return new Spliterators.AbstractSpliterator<T>(
estimateSize, src.characteristics()
) {
int index = 0;
public boolean tryAdvance(Consumer<? super T> cons) {
return getOrAdvance(index++, cons);
}
public Comparator<? super T> getComparator() {
return src.getComparator();
}
};
}
}
}
Guava Suppliers # memoize, (" memoizing").
- dataSrc
Supplier<Stream<T>>Supplier<List<T>>, - # memoize
cache():
private static <T> Supplier<Stream<T>> cache(Supplier<Stream<T>> dataSrc) {
Supplier<List<T>> memoized = Suppliers.memoize(() -> dataSrc.get().collect(toList()));
return () -> memoized.get().stream();
}
( Guava Guava cgcbSupplier java.util.Supplier, , )
Integer, 5 stdout:
private static Supplier<Stream<Integer>> getDataSrc() {
return () -> IntStream.generate(new IntSupplier() {
private int i = 0;
@Override
public int getAsInt() {
System.out.println("Computing next i: " + (i + 1));
return i += 1;
}
}).limit(5).boxed();
}
memoized
Supplier<Stream<Integer>> dataSrc = getDataSrc();
System.out.println(dataSrc.get().collect(toList()));
System.out.println(dataSrc.get().collect(toList()));
i: 1
i: 2
i: 3
i: 4
i: 5
[1, 2, 3, 4, 5]
i: 1
i: 2
i: 3
i: 4
i: 5
[1, 2, 3, 4, 5]
memoized
Supplier<Stream<Integer>> dataSrc = cached(getDataSrc());
System.out.println(dataSrc.get().collect(toList()));
System.out.println(dataSrc.get().collect(toList()));
i: 1
i: 2
i: 3
i: 4
i: 5
[1, 2, 3, 4, 5]
[1, 2, 3, 4, 5]