The buffering aspect of this question can be achieved using multicast trickery, but itβs much easier for me to write an operator for it so that the data and context are in the same accessible place:
public final class OperatorBufferFirst<T> implements Operator<List<T>, T> {
final Scheduler scheduler;
final long timeout;
final TimeUnit unit;
final int maxSize;
public OperatorBufferFirst(
long timeout, TimeUnit unit,
Scheduler scheduler, int maxSize) {
this.timeout = timeout;
this.unit = unit;
this.scheduler = scheduler;
this.maxSize = maxSize;
}
@Override
public Subscriber<? super T> call(
Subscriber<? super List<T>> t) {
BufferSubscriber<T> parent = new BufferSubscriber<>(
new SerializedSubscriber<>(t),
timeout, unit,
scheduler.createWorker(), maxSize);
t.add(parent);
return parent;
}
static final class BufferSubscriber<T>
extends Subscriber<T> {
final Subscriber<? super List<T>> actual;
final Scheduler.Worker w;
final long timeout;
final TimeUnit unit;
final int maxSize;
final SerialSubscription timer;
List<T> buffer;
long index;
public BufferSubscriber(
Subscriber<? super List<T>> actual,
long timeout,
TimeUnit unit,
Scheduler.Worker w,
int maxSize) {
this.actual = actual;
this.timeout = timeout;
this.unit = unit;
this.w = w;
this.maxSize = maxSize;
this.timer = new SerialSubscription();
this.buffer = new ArrayList<>();
this.add(timer);
this.add(w);
}
@Override
public void onNext(T t) {
List<T> b;
boolean startTimer = false;
boolean emit = false;
long idx;
synchronized (this) {
b = buffer;
b.add(t);
idx = index;
int n = b.size();
if (n == 1) {
startTimer = true;
} else
if (n < maxSize) {
return;
} else {
buffer = new ArrayList<>();
index = ++idx;
emit = true;
}
}
if (startTimer) {
final long fidx = idx;
timer.set(w.schedule(() -> timeout(fidx), timeout, unit));
}
if (emit) {
timer.set(Subscriptions.unsubscribed());
actual.onNext(b);
}
}
@Override
public void onError(Throwable e) {
actual.onError(e);
}
@Override
public void onCompleted() {
timer.unsubscribe();
List<T> b;
synchronized (this) {
b = buffer;
buffer = null;
index++;
}
if (!b.isEmpty()) {
actual.onNext(b);
}
actual.onCompleted();
}
public void timeout(long idx) {
List<T> b;
synchronized (this) {
b = buffer;
if (idx != index) {
return;
}
buffer = new ArrayList<>();
index = idx + 1;
}
actual.onNext(b);
}
}
public static void main(String[] args) {
TestScheduler s = Schedulers.test();
PublishSubject<Integer> source = PublishSubject.create();
source.lift(new OperatorBufferFirst<>(1, TimeUnit.SECONDS, s, 3))
.subscribe(System.out::println, Throwable::printStackTrace,
() -> System.out.println("Done"));
source.onNext(1);
source.onNext(2);
source.onNext(3);
source.onNext(4);
s.advanceTimeBy(1, TimeUnit.SECONDS);
source.onNext(5);
source.onNext(6);
s.advanceTimeBy(1, TimeUnit.SECONDS);
s.advanceTimeBy(1, TimeUnit.SECONDS);
source.onNext(7);
source.onCompleted();
}
}
, .
, , , , , T - . , FLUSH T - , , :
synchronized (this) {
b = buffer;
idx = index;
if (t != FLUSH) {
b.add(t);
int n = b.size();
if (n == 1) {
startTimer = true;
} else
if (n < maxSize) {
return;
} else {
buffer = new ArrayList<>();
index = ++idx;
emit = true;
}
} else {
buffer = new ArrayList<>();
index = ++idx;
emit = true;
}
}