I want to handle another observable logic chain for different implementations State. This can easily be achieved by a sealed class / algebraic data type / union + .flatMap(), but it splits the stream, where operators such as .distinctUntilChanged(), work only in a function .flatMap(), not in the whole stream.
sealed class State {
    object Loading : State()
    data class Loaded(val value: Int) : State()
}
@Test fun distinctTest() {
    val relay = PublishRelay.create<State>()
    relay.flatMap {
        fun handle(state: State): Observable<*> = when (state) {
            State.Loading -> Observable.just(state)
                    .distinctUntilChanged()
                    .doOnNext { println("loading") }
            is State.Loaded -> Observable.just(state)
                    .distinctUntilChanged()
                    .doOnNext { println(it.value) }
        }
        handle(it)
    }
            .subscribe()
    relay.accept(State.Loading)
    relay.accept(State.Loaded(1))
    relay.accept(State.Loaded(2))
    relay.accept(State.Loaded(3))
    relay.accept(State.Loaded(3))
    
    
}
Note. This is a simplified example. While I am just typing here, I really want to perform different actions (render the user interface differently) based on the type of implementationState
This can be done using an object / relay, but it will create a disconnected, mutable thread that I would also like to avoid.