Rx Buffer with timeout since the first new member of the group

quite new to the Rx world, and I need to implement the following behavior:

I need an observable to collect the values ​​and emit them as a list if I have at least N elements, or if time T has been spent since the first element of the group appeared .

I read the documents again and again, sure that he will use

buffer(timespan, unit, count[, scheduler])

But the problem is that from time to time it depends on the group of last elements .

And, if possible, I will also need to discard (force to discard) the current buffer, some elements should be processed immediately. Do I correctly assume that for such a case I will need a second observable, with processing to execute before each element and merge both?

Any idea?

Ps: I work in Java, but I do not need Java code, there will be enough explanation.

Thanks!

+4
source share
1 answer

The buffering aspect of this question can be achieved using multicast trickery, but it’s much easier for me to write an operator for it so that the data and context are in the same accessible place:

public final class OperatorBufferFirst<T> implements Operator<List<T>, T> {
    final Scheduler scheduler;
    final long timeout;
    final TimeUnit unit;
    final int maxSize;
    public OperatorBufferFirst(
            long timeout, TimeUnit unit, 
            Scheduler scheduler, int maxSize) {
        this.timeout = timeout;
        this.unit = unit;
        this.scheduler = scheduler;
        this.maxSize = maxSize;
    }

    @Override
    public Subscriber<? super T> call(
            Subscriber<? super List<T>> t) {
        BufferSubscriber<T> parent = new BufferSubscriber<>(
                new SerializedSubscriber<>(t), 
                timeout, unit, 
                scheduler.createWorker(), maxSize);
        t.add(parent);
        return parent;
    }

    static final class BufferSubscriber<T> 
    extends Subscriber<T> {
        final Subscriber<? super List<T>> actual;
        final Scheduler.Worker w;
        final long timeout;
        final TimeUnit unit;
        final int maxSize;
        final SerialSubscription timer;

        List<T> buffer;
        long index;


        public BufferSubscriber(
                Subscriber<? super List<T>> actual, 
                long timeout, 
                TimeUnit unit, 
                Scheduler.Worker w, 
                int maxSize) {
            this.actual = actual;
            this.timeout = timeout;
            this.unit = unit;
            this.w = w;
            this.maxSize = maxSize;
            this.timer = new SerialSubscription();
            this.buffer = new ArrayList<>();
            this.add(timer);
            this.add(w);
        }

        @Override
        public void onNext(T t) {
            List<T> b;
            boolean startTimer = false;
            boolean emit = false;
            long idx;
            synchronized (this) {
                b = buffer;
                b.add(t);
                idx = index;
                int n = b.size();
                if (n == 1) {
                    startTimer = true;
                } else
                if (n < maxSize) {
                    return;
                } else {
                    buffer = new ArrayList<>();
                    index = ++idx;
                    emit = true;
                }
            }

            if (startTimer) {
                final long fidx = idx;
                timer.set(w.schedule(() -> timeout(fidx), timeout, unit));
            }
            if (emit) {
                timer.set(Subscriptions.unsubscribed());
                actual.onNext(b);
            }
        }

        @Override
        public void onError(Throwable e) {
            actual.onError(e);
        }

        @Override
        public void onCompleted() {
            timer.unsubscribe();
            List<T> b;
            synchronized (this) {
                b = buffer;
                buffer = null;
                index++;
            }
            if (!b.isEmpty()) {
                actual.onNext(b);
            }
            actual.onCompleted();
        }

        public void timeout(long idx) {
            List<T> b;
            synchronized (this) {
                b = buffer;
                if (idx != index) {
                    return;
                }
                buffer = new ArrayList<>();
                index = idx + 1;
            }

            actual.onNext(b);
        }
    }

    public static void main(String[] args) {
        TestScheduler s = Schedulers.test();

        PublishSubject<Integer> source = PublishSubject.create();

        source.lift(new OperatorBufferFirst<>(1, TimeUnit.SECONDS, s, 3))
        .subscribe(System.out::println, Throwable::printStackTrace, 
                () -> System.out.println("Done"));

        source.onNext(1);
        source.onNext(2);
        source.onNext(3);

        source.onNext(4);
        s.advanceTimeBy(1, TimeUnit.SECONDS);

        source.onNext(5);
        source.onNext(6);

        s.advanceTimeBy(1, TimeUnit.SECONDS);
        s.advanceTimeBy(1, TimeUnit.SECONDS);

        source.onNext(7);
        source.onCompleted();
    }
}

, .

, , , , , T - . , FLUSH T - , , :

synchronized (this) {
    b = buffer;
    idx = index;
    if (t != FLUSH) {
        b.add(t);
        int n = b.size();
        if (n == 1) {
            startTimer = true;
        } else
        if (n < maxSize) {
            return;
        } else {
            buffer = new ArrayList<>();
            index = ++idx;
            emit = true;
        }
    } else {
        buffer = new ArrayList<>();
        index = ++idx;
        emit = true;
    }
}
+7

Source: https://habr.com/ru/post/1616337/


All Articles