How to manage a stream without .flatMap, which splits a reactive stream, preventing operators, such as distinctUntilChanged, from working with the entire stream

I want to handle another observable logic chain for different implementations State. This can easily be achieved by a sealed class / algebraic data type / union + .flatMap(), but it splits the stream, where operators such as .distinctUntilChanged(), work only in a function .flatMap(), not in the whole stream.

sealed class State {
    object Loading : State()
    data class Loaded(val value: Int) : State()
}

@Test fun distinctTest() {
    val relay = PublishRelay.create<State>()
    relay.flatMap {
        fun handle(state: State): Observable<*> = when (state) {
            State.Loading -> Observable.just(state)
                    .distinctUntilChanged()
                    .doOnNext { println("loading") }

            is State.Loaded -> Observable.just(state)
                    .distinctUntilChanged()
                    .doOnNext { println(it.value) }
        }
        handle(it)
    }
            .subscribe()

    relay.accept(State.Loading)
    relay.accept(State.Loaded(1))
    relay.accept(State.Loaded(2))
    relay.accept(State.Loaded(3))
    relay.accept(State.Loaded(3))
    //desired: loading, 1, 2, 3
    //actual: loading, 1, 2, 3, 3
}

Note. This is a simplified example. While I am just typing here, I really want to perform different actions (render the user interface differently) based on the type of implementationState

This can be done using an object / relay, but it will create a disconnected, mutable thread that I would also like to avoid.

+6
1

Observable , ? , .

, . , , :

@Test fun distinctTest() {
    val relay = PublishRelay.create<State>()

    val loadingObs = relay.filter { it is State.Loading }
                          .distinctUntilChanged()
                          .doOnNext { println("loading") }

    val loadedObs = relay.filter { it is State.Loaded }
                         .distinctUntilChanged()
                         .doOnNext { println(it.value) }

    val merged = loadingObs.mergeWith(loadedObs)

    merged.subscribe()

    relay.accept(State.Loading)
    relay.accept(State.Loaded(1))
    relay.accept(State.Loaded(2))
    relay.accept(State.Loaded(3))
    relay.accept(State.Loaded(3))
    // Hopefully prints this: loading, 1, 2, 3
}
0

Source: https://habr.com/ru/post/1016642/


All Articles