From RxJava 1 to RxJava 2

I am trying to convert this RxJava1 code to RxJava2

public static Observable<Path> listFolder(Path dir, String glob) {
    return Observable.<Path>create(subscriber -> {
        try {
            DirectoryStream<Path> stream =
                    Files.newDirectoryStream(dir, glob);

            subscriber.add(Subscriptions.create(() -> {
                try {
                    stream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }));
            Observable.<Path>from(stream).subscribe(subscriber);
        } catch (DirectoryIteratorException ex) {
            subscriber.onError(ex);
        } catch (IOException ioe) {
            subscriber.onError(ioe);
        }
    });
}

The fact is that in Rxjava2 I do not get a subscriber to add a new subscription to it.

+4
source share
1 answer

Enjoy the compression of RxJava 2 ( Flowable- backpressure support class):

public static Flowable<Path> listFolder(Path dir, String glob) {
    return Flowable.using(
        () -> Files.newDirectoryStream(dir, glob),
        stream -> Flowable.fromIterable(stream),
        stream -> stream.close());
}

If you do not want backpressure, replace Flowablewith Observable.

+5
source

Source: https://habr.com/ru/post/1659472/


All Articles