N-ary Cartesian product in RxJava

Now I hold Observable<Observable<Integer>, how can I transfer it to Observable<int[]>, which contains an n-ary Cartesian product?

For instance:

Observable<Observable<Integer> ob = Observable.just(
  Observable.just(0,1),
  Observable.just(2,3),
  Observable.just(4,5)
  );
ob...... ->   (0,2,4), (0,3,4), (0,2,5), (0,3,5), (1,2,4), (1,3,4), (1,2,5), (1,3,5)
+4
source share
3 answers

First of all, you need a fixed amount of input Observables. Secondly, there is no need to block, but most likely it is necessary to cache, because the second, third, etc. Observableneed to consume several times.

import java.util.*;

import io.reactivex.Observable;

public class Cartesian {

    static Observable<int[]> cartesian(Observable<Observable<Integer>> sources) {
        return sources.toList().flatMapObservable(list -> cartesian(list));
    }

    static Observable<int[]> cartesian(List<Observable<Integer>> sources) {
        if (sources.size() == 0) {
            return Observable.<int[]>empty();
        }
        Observable<int[]> main = sources.get(0).map(v -> new int[] { v });

        for (int i = 1; i < sources.size(); i++) {
            int j = i;
            Observable<Integer> o = sources.get(i).cache();
            main = main.flatMap(v -> {
                return o.map(w -> {
                    int[] arr = Arrays.copyOf(v, j + 1);
                    arr[j] = w;
                    return arr;
                });
            });
        }

        return main;
    }

    public static void main(String[] args) {
        cartesian(Observable.just(
            Observable.just(0, 1), 
            Observable.just(2, 3), 
            Observable.just(4, 5)
        ))
        .subscribe(v -> System.out.println(Arrays.toString(v)));
    }
}
+1
source

Creating a Cartesian product asynchronously is difficult or, in a sense, impossible. And if the lock is ok, you can do something like this

public class Main
{

    static class ProductIterator<T> implements Iterator<T[]>
    {
        private final List<List<T>> componentsList;
        private final Class<T> componentClass;
        private final int[] indices;
        private boolean hasNext;

        public ProductIterator(List<List<T>> componentsList, Class<T> componentClass)
        {
            this.componentsList = componentsList;
            this.componentClass = componentClass;
            this.indices = new int[componentsList.size()];
            this.hasNext = this.indices[componentsList.size() - 1] < componentsList.get(componentsList.size() - 1).size();
        }

        @Override
        public boolean hasNext()
        {
            return hasNext;
        }

        @Override
        public T[] next()
        {
            T[] res = (T[]) Array.newInstance(componentClass, componentsList.size());
            for (int i = 0; i < componentsList.size(); i++)
            {
                res[i] = componentsList.get(i).get(indices[i]);
            }

            // move next
            indices[0]++;
            for (int i = 0; i < componentsList.size() - 1; i++)
            {
                if (indices[i] == componentsList.get(i).size())
                {
                    indices[i] = 0;
                    indices[i + 1]++;
                }
            }
            hasNext = indices[componentsList.size() - 1] < componentsList.get(componentsList.size() - 1).size();

            return res;
        }
    }

    public static <T> Observable<T[]> product(Observable<Observable<T>> components, Class<T> componentClass)
    {
        return Observable.fromIterable(new Iterable<T[]>()
        {
            @Override
            public Iterator<T[]> iterator()
            {
                // postpone blocking up until iterator is requested 
                // and by this point we can't postpone anymore 
                Single<List<List<T>>> componentsList = components.map(o -> o.toList().blockingGet()).toList();
                return new ProductIterator<T>(componentsList.blockingGet(), componentClass);
            }
        });
    }

    public static void main(String[] args) throws Exception
    {
        Observable<Observable<Integer>> ob = Observable.just(
                Observable.just(0, 1),
                Observable.just(2, 3),
                Observable.just(4, 5)
        );

        Observable<Integer[]> product = product(ob, Integer.class);
        product.forEach(a -> System.out.println(Arrays.toString(a)));
    }
}

, , Observable, . , , - .

0

Well, I can solve it myself. But is there an even more elegant way?
(Method toArrayconverts Observable<T>to T[])

    Observable<int[]> toObservableArray(Observable<Observable<Integer>> obs) {
        List<int[]> list = obs.map(ob -> toArray(ob)).toList().toBlocking().last();
        return Observable.create(new SyncOnSubscribe<int[], int[]>() {
            @Override
            protected int[] generateState() {
                int[] array = new int[list.size()];
                Arrays.fill(array, 0);
                return array;
            }

            @Override
            protected int[] next(int[] state, Observer<? super int[]> observer) {
                int[] next = new int[list.size()];
                for (int i = 0; i < next.length; i++) {
                    next[i] = list.get(i)[state[i]];
                }
                observer.onNext(next);
                state[state.length - 1]++;
                for (int i = state.length - 1; i >= 0; i--) {
                    int delta = list.get(i).length - state[i];
                    if (delta > 0) {
                        break;
                    } else if (delta == 0) {
                        state[i] = 0;
                        if (i == 0) {
                            observer.onCompleted();
                            break;
                        }
                        state[i - 1]++;
                    }
                }
                return state;
            }
        });
    }
0
source

Source: https://habr.com/ru/post/1673796/


All Articles