I am trying to understand the purpose of this library from Jake Warton: https://github.com/JakeWharton/RxRelay
Basically: the subject, except when it is not possible to call onComplete or onError. Subjects are in a state of damage and damage: when they receive onComplete or onError, they can no longer be used to move data.
I realized that this is the right use case, but the above seems easy to achieve by simply using existing items.
1. Do not forward errors / completions messages to the topic:
'observable.subscribe({ subject.onNext(it) }, { log error / throw exception },{ ... })'
2. Do not open the topic, make sure that the signature of your method returns the observable.
fun(): Observable<> { return subject }
Obviously, I am missing something here, and I am very interested in what it is!
class MyPublishRelay<I> : Consumer<I> { private val subject: Subject<I> = PublishSubject.create<I>() override fun accept(intent: I) = subject.onNext(intent) fun subscribe(): Disposable = subject.subscribe() fun subscribe(c: Consumer<in I>): Disposable = subject.subscribe(c)
source share