RxJava- Group, Emit and Zip Sorted "chunks" with a common property?

My colleagues and I face frequent problems, and I hope that reactive programming can solve the problem. This will probably require my implementation of Operatoreither Transformer.

I want to take any Observable<T>emitting Telements, but I want the operator to group them on display Tand select each group as List<T>, or even better, some kind of shared drive Collectorfrom Java 8 threads.

But here is the hard part, which I don’t think can do groupBy(). I want to take two observables passing through this Operator and assume that the emitted elements are sorted by this property (the incoming data will be selected from the sorted SQL query and displayed on the object T). The operator will sequentially accumulate elements until the property changes, and then it publishes this group and proceeds to the next. Thus, I can take each group of data from each Observed, zip code and process these two pieces, then throw them away and move on to the next. In this way, I can maintain a semi-buffered state and maintain low memory consumption.

So, if I sorted, grouped and buttoned on PARTITION_ID, this is visually what I'm trying to accomplish.

enter image description here

, , , . , . GC , .

, , , , , . ?

public final class  SortedPartitioner<T,P,C,R> implements Transformer<T,R> {

 private final Function<T,P> mappedPartitionProperty;
 private final Supplier<C> acculatorSupplier;
 private final BiConsumer<T,R> accumulator;
 private final Function<C,R> finalResult;


 private SortedPartitioner(Function<T, P> mappedPartitionProperty, Supplier<C> acculatorSupplier,
   BiConsumer<T, R> accumulator, Function<C, R> finalResult) {
  this.mappedPartitionProperty = mappedPartitionProperty;
  this.acculatorSupplier = acculatorSupplier;
  this.accumulator = accumulator;
  this.finalResult = finalResult;
 }
 public static <T,P,C,R> SortedPartitioner<T,P,C,R> of(
   Function<T,P> mappedPartitionProperty, 
   Supplier<C> accumulatorSupplier,
   BiConsumer<T,R> accumulator,
   Function<C,R> finalResult) {

  return new SortedPartitioner<>(mappedPartitionProperty, accumulatorSupplier, accumulator, finalResult);

 }
 @Override
 public Observable<R> call(Observable<T> t) {
  return null;
 }

}
+4
2

, Maven Central .

pom.xml.

<dependency>
    <groupId>com.github.davidmoten</groupId>
    <artifactId>rxjava-extras</artifactId>
    <version>0.5.13</version>
</dependency>

partition_id :

import com.github.davidmoten.rx.Transformers;

Observable<List<Item>> grouped = items.compose(
    Transformers.toListWhile(
        (list, item) -> list.isEmpty() || list.get(0).partitionId == item.partitionId));

(. Transformers.collectWhile , ), github.

zip.

+1

, .

materialize, scan flatMap. scan partitionId , . materialize , , , , . flatMap , ( partitionId) (), .

unit test, 1, 1, 2, 2, 2, 3 {1, 1}, {2, 2, 2}, {3}.

.

:

import static org.junit.Assert.assertEquals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

import org.junit.Test;

import rx.Observable;

public class StateMachineExampleTest {

    @Test
    public void testForStackOverflow() {
        Observable<Integer> a = Observable.just(1, 1, 2, 2, 2, 3);
        State<Integer> initial = new State<Integer>(Collections.emptyList(), Optional.empty(),
                false);
        List<List<Integer>> lists = a.materialize()
                // accumulate lists and uses onCompleted notification to emit
                // left overs when source completes
                .scan(initial,
                        (state, notification) -> {
                            if (notification.isOnCompleted()) {
                                return new State<>(null, state.value, true);
                            } else if (notification.isOnError())
                                throw new RuntimeException(notification.getThrowable());
                            else if (state.list.size() == 0) {
                                return new State<>(Arrays.asList(notification.getValue()), Optional
                                        .empty(), false);
                            } else if (partitionId(notification.getValue()) == partitionId(state.list
                                    .get(0))) {
                                List<Integer> list = new ArrayList<>();
                                list.addAll(state.list);
                                list.add(notification.getValue());
                                return new State<>(list, Optional.empty(), false);
                            } else if (state.value.isPresent()) {
                                if (partitionId(state.value.get()) == partitionId(notification
                                        .getValue())) {
                                    return new State<>(Arrays.asList(state.value.get(),
                                            notification.getValue()), Optional.empty(), false);
                                } else {
                                    return new State<>(Arrays.asList(state.value.get()), Optional
                                            .of(notification.getValue()), false);
                                }
                            } else {
                                return new State<>(state.list,
                                        Optional.of(notification.getValue()), false);
                            }
                        })
                // emit lists from state
                .flatMap(state -> {
                    if (state.completed) {
                        if (state.value.isPresent())
                            return Observable.just(Arrays.asList(state.value.get()));
                        else
                            return Observable.empty();
                    } else if (state.value.isPresent()) {
                        return Observable.just(state.list);
                    } else {
                        return Observable.empty();
                    }
                })
                // get as a list of lists to check
                .toList().toBlocking().single();
        assertEquals(Arrays.asList(Arrays.asList(1, 1), Arrays.asList(2, 2, 2), Arrays.asList(3)),
                lists);
    }

    private static int partitionId(Integer n) {
        return n;
    }

    private static final class State<T> {
        final List<T> list;
        final Optional<T> value;
        final boolean completed;

        State(List<T> list, Optional<T> value, boolean completed) {
            this.list = list;
            this.value = value;
            this.completed = completed;
        }
    }

}

, . .

, , , materialize, scan flatMap, , , zip.

+1

Source: https://habr.com/ru/post/1598749/


All Articles