Downstream RX Filter

I am trying to filter out further upstream elements based on the state of the downstream. mapProcess essentially starts the process (script or exe). The process may take some time, and I want to ignore any subsequent elements of the thread until it ends. createProcess also returns an Observable of StdOut. We switch to the IObservable created by createProcess and map arg to StdOut.

Example:

let mapProcess obs =
  obs
  |> Observable.map (fun arg -> createProcess arg)
  |> Observable.switch

WHAT I AM: This works, but is not completely happy with the mutable here.

let mapProcess obs =
  let mutable processNotRunning = true
  obs
  |> Observable.filter (fun _ -> processNotRunning)
  |> Observable.map (fun arg -> processNotRunning <- false
                                createProcess arg)  
  |> Observable.switch
  |> Observable.iter (fun _ -> processNotRunning <- true)
  |> Observable.finallyDo (fun _ -> processNotRunning <- true)

, - "switchIfSeen" Observable, , . , , RX?

+4
1

[...] ,

- - , . , .

  var map = argn.Select(CreateProcess).Publish().RefCount();

        map.SelectMany(o => o) //flatmap
           .Take(1)
           .Repeat()
           .Subscribe(d => Console.WriteLine($"Did task which took {d * 100}msecs" ));

:

(CreateProcess - , x100 .)

    private static void Main(string[] args)
    {
        var argn = Observable.Interval(TimeSpan.FromMilliseconds(100)).Publish().RefCount();
        argn.Subscribe(Console.WriteLine);

        var map = argn.Select(CreateProcess).Publish().RefCount();

        map.SelectMany(o => o)
           .Take(1)
           .Repeat()
           .Subscribe(d => Console.WriteLine($"Did task which took {d * 100}msecs" ));

        Console.ReadKey();
    }

    static IObservable<long> CreateProcess(long i) => Observable.Timer(TimeSpan.FromMilliseconds(i * 100)).Select(_ => i);

:

0
Did task which took 0msecs
1
2
Did task which took 100msecs
3
4
5
Did task which took 300msecs
6
7
8
9
10
11
Did task which took 600msecs
+2

Source: https://habr.com/ru/post/1692198/


All Articles