What is the best way to do lazy sampling using Rx?

I am working on a Xamarin application where I have expanded the Connectivity plugin to use Rx rather than events.

My goal was to introduce a slight delay in reconnecting so that the network adapter time could connect to the Internet (workaround for UWP). If any values ​​appear during this delay, only the last must be saved, since only the current state of the connection matters.

This works fine, but it's a bit hacky:

internal static class ConnectivityExtensions
{
    public static IObservable<bool> ToObservable(this IConnectivity @this)
    {
        var connectivity = Observable
            .FromEventPattern<ConnectivityChangedEventHandler, ConnectivityChangedEventArgs>(
                handler => @this.ConnectivityChanged += handler,
                handler => @this.ConnectivityChanged -= handler)
            .Select(args => args.EventArgs.IsConnected);

        var sampling = connectivity
            .Timestamp()
            .Select(ts => new
            {
                ts.Value,
                ts.Timestamp,
                // If reconnection, delay subscriber notification for 250ms
                DelayUntil = ts.Value ? (DateTimeOffset?)ts.Timestamp.Add(TimeSpan.FromMilliseconds(250)) : null
            })
            .Scan((acc, current) => new
            {
                current.Value,
                current.Timestamp,
                // If current notification is during reconnection notification delay period, delay the current notification too
                DelayUntil = current.Timestamp < acc.DelayUntil ? acc.DelayUntil : current.DelayUntil
            })
            // Perform reconnection delay
            .Delay(x => x.DelayUntil.HasValue
                ? Observable.Return(x.DelayUntil.Value).Delay(x.DelayUntil.Value)
                : Observable.Empty<DateTimeOffset>())
            // All delayed notifications are delayed until the same time, so we only need one notification to trigger sampling after delay
            .DistinctUntilChanged()
            .Select(_ => Unit.Default);

        return connectivity
            .Sample(sampling)
            .StartWith(@this.IsConnected)
            .DistinctUntilChanged()
            .Replay(1)
            .RefCount();
    }
}

Result:

, . "", (UWP). , /.

                                                                                     // StartsWith: True
ConnectivityChanged(null, new ConnectivityChangedEventArgs { IsConnected = true });  // Delay 250ms
Thread.Sleep(TimeSpan.FromMilliseconds(250));                                        // Sample: True, Ignored due to DistinctUntilChanged
ConnectivityChanged(null, new ConnectivityChangedEventArgs { IsConnected = false }); // Sample: False
Thread.Sleep(TimeSpan.FromMilliseconds(250));
ConnectivityChanged(null, new ConnectivityChangedEventArgs { IsConnected = true });  // Delay 250ms, Discarded due to Sample
ConnectivityChanged(null, new ConnectivityChangedEventArgs { IsConnected = false }); // Delayed by previous, Discarded due to Sample
ConnectivityChanged(null, new ConnectivityChangedEventArgs { IsConnected = true });  // Delayed by previous, Discarded due to Sample
ConnectivityChanged(null, new ConnectivityChangedEventArgs { IsConnected = false }); // Delayed by previous
Thread.Sleep(TimeSpan.FromMilliseconds(250));                                        // Sample: False, Ignored due to DistinctUntilChanged
ConnectivityChanged(null, new ConnectivityChangedEventArgs { IsConnected = true });  // Delay 250ms, Discarded due to Sample
ConnectivityChanged(null, new ConnectivityChangedEventArgs { IsConnected = false }); // Delayed by previous
Thread.Sleep(TimeSpan.FromMilliseconds(250));                                        // Sample: False, Ignored due to DistinctUntilChanged

// Final Output:
// True
// False

?

+4
2

Throttle, , . IConnectivity, :

internal sealed class ReactiveCrossConnectivity : IConnectivity
{
    public IObservable<bool> IsConnected { get; }

    public ReactiveCrossConnectivity(
        Plugin.Connectivity.Abstractions.IConnectivity connectivity, 
        ISchedulerProvider scheduler)
    {
        IsConnected = Observable
            .FromEventPattern<ConnectivityChangedEventHandler, ConnectivityChangedEventArgs>(
                handler => connectivity.ConnectivityChanged += handler,
                handler => connectivity.ConnectivityChanged -= handler,
                scheduler.Defaults.ConstantTimeOperations)
            .Select(args => args.EventArgs.IsConnected)
            .Throttle(isConnected => isConnected
                ? Observable.Timer(TimeSpan.FromMilliseconds(250),
                      scheduler.Defaults.TimeBasedOperations)
                : Observable.Return<long>(0))
            .StartWith(scheduler.Defaults.ConstantTimeOperations, connectivity.IsConnected)
            .DistinctUntilChanged()
            .Replay(1, scheduler.Defaults.Iteration)
            .RefCount();
    }
}

NUnit/NSubstitute, :

public sealed class ReactiveCrossConnectivityTest : ReactiveTest
{
    [Test]
    public void IsConnected_ThrottlesOnConnect()
    {
        var connectivity = Substitute.For<Plugin.Connectivity.Abstractions.IConnectivity>();
        connectivity.IsConnected.Returns(true);
        var testScheduler = new TestScheduler();
        var sut = new ReactiveCrossConnectivity(
            connectivity, new SingleSchedulerProvider(testScheduler));
        var isConnectedObserver = testScheduler.CreateObserver<bool>();

        sut.IsConnected.Subscribe(isConnectedObserver);

        void Change(bool isConnected) => connectivity.ConnectivityChanged +=
            Raise.Event<ConnectivityChangedEventHandler>(connectivity,
                new ConnectivityChangedEventArgs { IsConnected = isConnected });

        testScheduler.Schedule(TimeSpan.FromTicks(3), () => Change(true));
        testScheduler.Schedule(TimeSpan.FromMilliseconds(251), () => Change(false));
        testScheduler.Schedule(TimeSpan.FromMilliseconds(501), () => Change(true));
        testScheduler.Schedule(TimeSpan.FromMilliseconds(751), () => Change(true));
        testScheduler.Schedule(TimeSpan.FromMilliseconds(752), () => Change(false));
        testScheduler.Schedule(TimeSpan.FromMilliseconds(753), () => Change(true));
        testScheduler.Schedule(TimeSpan.FromMilliseconds(754), () => Change(false));
        testScheduler.Schedule(TimeSpan.FromMilliseconds(1001), () => Change(true));
        testScheduler.Schedule(TimeSpan.FromMilliseconds(1002), () => Change(false));

        testScheduler.Start();

        isConnectedObserver.Messages.AssertEqual(
            OnNext(2, true),
            OnNext(TimeSpan.FromMilliseconds(251).Ticks + 1, false));
    }
}
0

, , :

T (millis)           : 0----250--500--1000-1250-1500-1750-2000
Connectivity         : ---F-------T---F----T-------F--T-------
ScanValueDelayUntil  : ---null----800-null-1500----null2100---
Sampling             : -------------x-----------x-----------x-
ResultSampled        : T------------T-----------T-----------T-
ResultSampledDistinct: T--------------------------------------

. , , , , ?

+1

Source: https://habr.com/ru/post/1682678/


All Articles