Observed Against Flowable rxJava2

I watched the new rx java 2 and I'm not quite sure that I understand the idea of backpressure ...

I know that we have an Observable that does not have backpressure support and Flowable that has it.

So, for example, let's say I have a Flowable with interval :

  Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { // do smth } }); 

This will crash after 128 values, and it is pretty obvious that I consume more slowly than getting items.

But then we have the same thing with Observable

  Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { // do smth } }); 

This will not work at all, even if I take care of its consumption, it still works. To make Flowable work, let's say I put the onBackpressureDrop , the crash disappeared, but not all values ​​are also emitted.

So, the basic question I cannot find in my head, why should I care about backpressure , when I can use plain Observable , still get all the values ​​without buffer control? Or maybe, on the other hand, what are the benefits of backpressure in favor of managing and processing consumption?

+106
java android rx-java
Oct 29 '16 at 20:19
source share
3 answers

What counterpressure is shown in practice are limited buffers, Flowable.observeOn has a 128-element buffer that merges as fast as it can receive it. You can increase the size of this buffer individually to handle a batch source, and all backpressure control methods are still applied from 1.x. Observable.observeOn has an unlimited buffer that continues to collect elements, and your application may end up without memory.

You can use Observable , for example:

  • GUI processing
  • work with short sequences (total less than 1000 elements)

You can use Flowable , for example:

  • cold and non-urgent sources
  • source-like generators
  • network access devices and databases
+96
30 Oct '16 at 7:24
source share

Back pressure is when your watcher (publisher) creates more events than your subscriber can handle. This way, you can get subscribers to missed events, or you can get a huge queue of events, which ultimately just leads to a lack of memory. Flowable accepts back pressure. Observable no. This is it.

it reminds me of a funnel that, when it has too much fluid, overflows. Fluid can help prevent this:

with huge back pressure:

enter image description here

but when using fluid back pressure is much less:

enter image description here

Rxjava2 has several backpressure strategies that you can use depending on your use case. By strategy, I mean that Rxjava2 provides a way to handle objects that cannot be processed due to overflow (backpressure).

here are the strategies. I won’t go through all of them, but, for example, if you don’t want to worry about overcrowded items, you can use this discarding strategy:

observable.toFlowable (BackpressureStrategy.DROP)

As far as I know, there should be a limit of 128 elements in the queue, after which there may be an overflow (back pressure). Even if its not 128, it is close to this number. Hope this helps someone.

if you need to change the buffer size from 128, it seems like you can do it like this (but watch out for any memory limitations:

 myObservable.toFlowable(BackpressureStrategy.MISSING).buffer(256); //but using MISSING might be slower. 

In software development, usually a counteraction strategy means that you tell the issuer to slow down a bit, because the consumer cannot handle the speed of your emitting events.

+79
May 30 '17 at 17:29
source share

The fact that your Flowable crashed after emitting 128 values ​​without processing back pressure does not mean that it will always crash after 128 values: sometimes it crashes after 10, and sometimes it doesn't crash at all. I believe this happened when you tried the Observable example - there was no back pressure, so your code worked fine, the next time it might not be. The difference in RxJava 2 is that there is no longer the concept of backpressure in Observable and there is no way to handle this. If you are designing a reactive sequence that is likely to require explicit backpressure processing, then Flowable is your best bet.

+13
Oct 29 '16 at 20:43
source share



All Articles