Transform Observable if other display mappings are observed

I am creating a game in which there is an observable flow of events X representing the products supplied by the manufacturer. There are also some external events (let them be called Transformers) that affect production productivity in various ways and for different periods of time. I want to imagine this with other observables that emit a function that converts X and which should be applied to each X until it completes with the transformer conversion. The number of transformers is not known in advance - they are created as a result of user actions (for example, equipment purchase) or are randomly generated (for example, equipment failures).

It seems to me that I need IObservable<IObservable<Func<X,X>>>which I should Join( Zip, something else?) With help IObservable<X>. Can you help me? Observable.CombineLatest- This is almost what I need, but it is required IEnumerable<IObservable<T>>.

If my description is unclear, here is a marble diagram: marble diagram

In more abstract terms, what I need is quite similar to the matrix transposition, but it List<List<T>>has instead IObservable<IObservable<T>>.

+4
source share
4 answers

Well, that was a real bend, but I think I have something that works. First, I created an extension method to convert IObservable<IObservable<Func<T, T>>to IObservable<IEnumerable<Func<T, T>>. The extension method works under the assumption that each observable will produce only one Func<T, T>until completion.

public static class MoreReactiveExtensions
{
    public static IObservable<IEnumerable<Func<T, T>>> ToTransformations<T>(this IObservable<IObservable<Func<T, T>>> source)
    {
        return
            Observable
            // Yield an empty enumerable first.
            .Repeat(Enumerable.Empty<Func<T, T>>(), 1)
            // Then yield an updated enumerable every time one of 
            // the transformation observables yields a value or completes.
            .Concat(                                    
                source
                .SelectMany((x, i) => 
                    x
                    .Materialize()
                    .Select(y => new 
                        { 
                            Id = i, 
                            Notification = y 
                        }))
                .Scan(
                    new List<Tuple<int, Func<T, T>>>(),
                    (acc, x) => 
                    {
                        switch(x.Notification.Kind)
                        {
                            // If an observable compeleted then remove
                            // its corresponding function from the accumulator.
                            case NotificationKind.OnCompleted:
                                acc = 
                                    acc
                                    .Where(y => y.Item1 != x.Id)
                                    .ToList();
                                break;
                            // If an observable yield a new Func then add
                            // it to the accumulator.
                            case NotificationKind.OnNext:
                                acc = new List<Tuple<int, Func<T, T>>>(acc) 
                                    { 
                                        Tuple.Create(x.Id, x.Notification.Value) 
                                    };
                                break;
                            // Do something with exceptions here.
                            default:
                                // Do something here
                                break;
                        }
                        return acc;
                    })
                // Select an IEnumerable<Func<T, T>> here.
                .Select(x => x.Select(y => y.Item2)));
    }
}

Then, given the following variables:

IObservable<IObservable<Func<int, int>>> transformationObservables
IObservable<int> products`

:

var transformations =
    transformationObservables
    .ToTransformations()
    .Publish()
    .RefCount();

IObservable<int> transformedProducts=
    transformations
    .Join(
        products,
        t => transformations,
        i => Observable.Empty<int>(),
        (t, i) => t.Aggregate(i, (ii, tt) => tt.Invoke(ii)))

.

+2

, int, :

IObservable<IObservable<Func<int, int>>> transformerObservables = null;
IObservable<int> values = null;

,

IObservable<IObservable<Func<int, int>>> -> IObservable<<Func<int, int>>[]>

, , , , Func <... > . , ...

var transformerArrayObservable = transformerObservables
    // ...attach each transformer the index of the observable it came from:        
    .Select((transformerObservable, index) => transformerObservable
        .Select(transformer => Tuple.Create(index, transformer))
        // Then, materialize the transformer sequence so we get noticed when the sequence terminates.
        .Materialize()
        // Now the fun part: Make a scan, resulting in an observable of tuples
        // that have the previous and current transformer
        .Scan(new
        {
            Previous = (Tuple<int, Func<int, int>>)null,
            Current = (Tuple<int, Func<int, int>>)null
        },
        (tuple, currentTransformer) => new
        {
            Previous = tuple.Current,
            Current = currentTransformer.HasValue
                ? currentTransformer.Value
                : (Tuple<int, Func<int, int>>)null
        }))
        // Merge these and do another scan, this time adding and removing
        // the transformers from a list.
        .Merge()
        .Scan(
            new Tuple<int, Func<int, int>>[0],
            (array, tuple) =>
            {
                //Expensive! Consider taking a dependency on immutable collections here!
                var list = array.ToList();

                if (tuple.Previous != null)
                    list.Remove(tuple.Previous);

                if (tuple.Current != null)
                    list.Add(tuple.Current);

                return list.ToArray();
            })
            // Extract only the actual functions
        .Select(x => x.Select(y => y.Item2).ToArray())
        // Finally, to make sure that values are passed even when no transformer has been observed
        // start this sequence with the neutral transformation.
        // IMPORTANT: You should test what happens when the first value is oberserved very quickly. There might be timing issues.
        .StartWith(Scheduler.Immediate, new[] { new Func<int, int>[0]});

, Rx, CombineVeryLatest. .

var transformedValues = values
    .CombineVeryLatest(transformerArrayObservable, (value, transformers) =>
    {
        return transformers
            .Aggregate(value, (current, transformer) => transformer(current));
    });

. , , .

+3

:

        Output = Input
            .WithLatestFrom(
                transformations.Transpose(),
                (e, fs) => fs.Aggregate(e, (x, f) => f(x)))
            .SelectMany(x => x)
            .Publish();

Transpose WithLatestFrom :

    public static IObservable<IObservable<T>> Transpose<T>(this IObservable<IObservable<T>> source)
    {
        return Observable.Create<IObservable<T>>(o =>
        {
            var latestValues = new Dictionary<IObservable<T>, T>();
            var result = new BehaviorSubject<IObservable<T>>(Observable.Empty<T>());

            source.Subscribe(observable =>
            {
                observable.Subscribe(t =>
                {
                    latestValues[observable] = t;
                    result.OnNext(latestValues.ToObservable().Select(kv => kv.Value));
                }, () =>
                {
                    latestValues.Remove(observable);
                });
            });

            return result.Subscribe(o);
        });
    }

    public static IObservable<R> WithLatestFrom<T, U, R>(
        this IObservable<T> source,
        IObservable<U> other,
        Func<T, U, R> combine)
    {
        return Observable.Create<R>(o =>
        {
            var current = new BehaviorSubject<U>(default(U));
            other.Subscribe(current);
            return source.Select(s => combine(s, current.Value)).Subscribe(o);
        });
    }

unit test, :

    [TestMethod]
    public void WithLatestFrom_ShouldNotDuplicateEvents()
    {
        var events = new Subject<int>();

        var add1 = new Subject<Func<int, int>>();
        var add2 = new Subject<Func<int, int>>();
        var transforms = new Subject<IObservable<Func<int, int>>>();

        var results = new List<int>();

        events.WithLatestFrom(
                transforms.Transpose(),
                (e, fs) => fs.Aggregate(e, (x, f) => f(x)))
            .SelectMany(x => x)
            .Subscribe(results.Add);


        events.OnNext(1);
        transforms.OnNext(add1);
        add1.OnNext(x => x + 1);
        events.OnNext(1); // 1+1 = 2
        transforms.OnNext(add2);
        add2.OnNext(x => x + 2);
        events.OnNext(1); // 1+1+2 = 4
        add1.OnCompleted();
        events.OnNext(1); // 1+2 = 3
        add2.OnCompleted();
        events.OnNext(1);

        CollectionAssert.AreEqual(new int[] { 1, 2, 4, 3, 1 }, results);
    }
+2

?

, Transformer , , , , ?

Transformer , .

0

Source: https://habr.com/ru/post/1584773/


All Articles