How to create a relationship between observables?

I want a tool for testing Rx components that will work as follows: Given the order of events specified as 'v seqwell as the key selection function ( keySelector :: 'v -> 'k), I want to create Map<'k, IObservable<'k>>, where the guarantee is that group observable values ​​give the values ​​in the global order defined above.

For example: makeObservables isEven [1;2;3;4;5;6] ... should produce

{ true : -2-4-6|,
  false: 1-3-5| }

This is my attempt:

open System
open System.Reactive.Linq
open FSharp.Control.Reactive

let subscribeAfter (o1: IObservable<'a>) (o2 : IObservable<'b>) : IObservable<'b> =
    fun (observer : IObserver<'b>) ->
        let tempObserver = { new IObserver<'a> with
                                member this.OnNext x = ()
                                member this.OnError e = observer.OnError e
                                member this.OnCompleted () = o2 |> Observable.subscribeObserver observer |> ignore
                            }
        o1.Subscribe tempObserver
    |> Observable.Create

let makeObservables (keySelector : 'a -> 'k) (xs : 'a seq) : Map<'k, IObservable<'a>> =
    let makeDependencies : ('k * IObservable<'a>) seq -> ('k * IObservable<'a>) seq = 
        let makeDep ((_, o1), (k2, o2)) = (k2, subscribeAfter o1 o2)

        Seq.pairwise
        >> Seq.map makeDep

    let makeObservable x = (keySelector x, Observable.single x)

    let firstItem = 
        Seq.head xs 
        |> makeObservable 
        |> Seq.singleton

    let dependentObservables =
        xs
        |> Seq.map makeObservable
        |> makeDependencies

    dependentObservables
    |> Seq.append firstItem
    |> Seq.groupBy fst
    |> Seq.map (fun (k, obs) -> (k, obs |> Seq.map snd |> Observable.concatSeq))
    |> Map.ofSeq

[<EntryPoint>]
let main argv = 
    let isEven x = (x % 2 = 0)

    let splits : Map<bool, IObservable<int>> =
        [1;2;3;4;5]
        |> makeObservables isEven

    use subscription =
        splits
        |> Map.toSeq
        |> Seq.map snd
        |> Observable.mergeSeq
        |> Observable.subscribe (printfn "%A")


    Console.ReadKey() |> ignore
    0 // return an integer exit code

... but the results are not as expected, and the observed values ​​are not globally.

It seems that the elements in each group have the correct yield, but when the groups come together, they are more like concat, then merge

Expected Result: 1 2 3 4 5 ... but the actual conclusion1 3 5 2 4

What am I doing wrong?

Thank!

+4
1

:

{ true : -2-4-6|,
  false: 1-3-5| }

:

{ true : 246|,
  false: 135| }

, merge . Rx , 1 2, merge .

, , merge .

+2

Source: https://habr.com/ru/post/1673106/


All Articles