RxJava rx.exceptions.MissingBackpressureException with filter and map


I just started playing with RxJava / RxAndroid and have some problems to figure out how to handle back pressure correctly.

I have a file scanner that scans directories and emits files. These files should be processed as quickly as possible and not skip them.

So, the pipeline looks like this: Observable<File> -> Filter<File, Boolean> {check if file is of type .xyz}

Sorry, I get rx.exceptions.MissingBackpressureException error . Therefore, I read about backpressure, and, if I understand correctly, losses are less than options - these are just buffers and windows.

I have tried onBackpressureBuffer(), buffer() and window(). Although all commands onBackpressureX()do not seem to have effects, buffer () groups the elements into List<File>. My questions:

  • How do I filter these groups? filter(<List<File>>, Boolean)doesn't make sense ...
  • How can I implement file storage in my observed scrollback so that it waits for my pipelines / operators / subscribers to have capacity?
  • Is it good practice to convert elements, for example. map()in XYZ-entities and save them in a separate list without an active subscriber, but as a side effect in the operator?

Some feedback or even tips would be very helpful and appreciated.

+4
source share
1 answer

I think I found a solution to the problem: This code did not work:

Observable<File> task = scanner.getProcessDirectoryTask(mountPoint);
Subscription _subscription = task
       .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .onBackpressureBuffer(10000)
        .filter(new Func1<File, Boolean>() {
            @Override
            public Boolean call(File file) {
                return file.getAbsolutePath().endsWith("xyz");
            }
        })
        .buffer(100)
        .subscribe(new Observer<List<File>>() { /*whatever you want to do*/ }

However, this code works :

Observable<File> task = scanner.getProcessDirectoryTask(mountPoint);
Subscription _subscription = task
        .onBackpressureBuffer(10000)
        .filter(new Func1<File, Boolean>() {
            @Override
            public Boolean call(File file) {
                return file.getAbsolutePath().endsWith("xyz");
            }
        })
        .buffer(100)
       .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<List<File>>() { /*whatever you want to do*/ }

, subscribeOn() observeOn() !

, . , - .

+3

Source: https://habr.com/ru/post/1619725/


All Articles