IObservable Subscribers Self-Support

I have F # code that looks like this:

module O = Control.Observable
//...
use ss = serve' 4000
         |> O.subscribe
            (fun c -> use cs = RSS.items
                               |> O.subscribe (bytes >> c.SendAll) |> ignore)

Where

serve'    : int -> IObservable<Socket>
c         : Socket
RSS.items : IObservable<XElement>
bytes     : XElement -> byte []
c.SendAll : byte [] -> unit
  • What is the most idiomatic way to keep csuntil c.SendAllit works?
  • Is there or can be determined where, if it fails, the subscription will be deleted; otherwise executed as long as it continues to push? Observable.subscribeUntilError(action)actionactionIObservable
0
source share
1 answer

I came up with this:

let inline Δ<'a> = Unchecked.defaultof<'a>
let inline LOG x = printf "%A" x

module O = Observable
  let d = ref (Δ:IDisposable)
  let b x = try a x with e -> LOG e; let s = !d in if s <> Δ then s.Dispose()
  d := o |> O.subscribe b
  {
    new IDisposable with
      member x.Dispose() = let s = !d in if s <> Δ then d := Δ; s.Dispose()
  }

To demonstrate the difference, try it main!

Usagesubscribe :

use s = new Subject<int>()
use x = s |> O.subscribe (fun _ -> raise <| Exception ())
use y = s |> O.subscribe (printf "%i")
s.OnNext 20

application crashes:

Unhandled Exception: System.Exception: Exception of type 'System.Exception' was thrown.
   at Microsoft.FSharp.Core.Operators.Raise[T](Exception exn)
   at Program.System.x@604-5.Invoke(Int32 _arg1) in C:\Eniox\Eniox.News.Google\Eniox.News.Google\Program.fs:line 60
   at Microsoft.FSharp.Control.CommonExtensions.SubscribeToObservable@1915.System-IObserver`1-OnNext(T value)
   at System.Reactive.Observer`1.OnNext(T value)
   at System.Reactive.Subjects.Subject`1.OnNext(T value)
   at Program.System.main(String[] args) in C:\Eniox\Eniox.News.Google\Eniox.News.Google\Program.fs:line 606

Now usingsubscribeUE :

use s = new Subject<int>()
use x = s |> O.subscribeUE (fun _ -> raise <| Exception ())
use y = s |> O.subscribe   (printf "%i")
s.OnNext 20

x, ! LOG = ignore:

20

, - RX 2.0, , , .

0

Source: https://habr.com/ru/post/1735052/


All Articles