I am trying to filter out further upstream elements based on the state of the downstream. mapProcess essentially starts the process (script or exe). The process may take some time, and I want to ignore any subsequent elements of the thread until it ends. createProcess also returns an Observable of StdOut. We switch to the IObservable created by createProcess and map arg to StdOut.
Example:
let mapProcess obs =
obs
|> Observable.map (fun arg -> createProcess arg)
|> Observable.switch
WHAT I AM: This works, but is not completely happy with the mutable here.
let mapProcess obs =
let mutable processNotRunning = true
obs
|> Observable.filter (fun _ -> processNotRunning)
|> Observable.map (fun arg -> processNotRunning <- false
createProcess arg)
|> Observable.switch
|> Observable.iter (fun _ -> processNotRunning <- true)
|> Observable.finallyDo (fun _ -> processNotRunning <- true)
, - "switchIfSeen" Observable, , . , , RX?