Extremely Long Rx Event Chains

I am working on what can best be described as a modeling / workflow / game logic engine (although it does not fall directly into any of these categories).

The goal is for it to be a fully event driven (reactive) event and should support the possibility of tens or even hundreds of thousands of events with chaining, branching and filtering and concurrency, and all that Rx is kindness.

I am very new to Reactive Extensions and decided to write the simplest test I could think of (combine a bunch of ISubjects together). I quickly found that combining too many events (about 12,000 in my case) would throw a StackOverflowException, which makes sense to me when we look at Rx - it's just posting event handlers in different ways, and the call stacks can be so deep.

So I'm looking for a way (Reactive-ish?) Around this limitation. I cannot be the only one who wants to do something extremely large with this structure. Any help that the community can provide will be greatly appreciated.

Here is my test code:

class Program
{
    static void Main(string[] args)
    {
        for (int i = 0; i < 1000000; i += 1000)
        {
            Console.Write($"{i} ");
            using (Dynamite dynamite = new Dynamite())
            {
                dynamite.Setup(i);
                dynamite.Trigger();
            }
        }
        Console.ReadKey();

    }
}

public class Dynamite : IDisposable
{
    ISubject<bool> start = null;
    IList<IDisposable> handles = new List<IDisposable>();

    public void Setup(int length)
    {
        length = length == 0 ? 1 : length;

        var fuses =
            Enumerable.Range(0, length)
            .Select(v => new Subject<bool>())
            .ToArray();

        ISubject<bool> prev = null;

        foreach (var fuse in fuses)
        {
            //Console.Write(".");

            if (prev != null)
            {
                Attach(prev, fuse);
            }
            prev = fuse;
        }

        start = fuses.First();
        var end = fuses.Last();

        handles.Add(
            end
                .Subscribe(onNext: b =>
                {
                    //Console.Write("t");
                    this.Explode();
                }));
    }

    void Attach(ISubject<bool> source, ISubject<bool> dest)
    {
        var handle = source
            .Subscribe(onNext: b =>
             {
                 //Console.Write("s");
                 dest.OnNext(b);
             });
        handles.Add(handle);
    }

    public void Trigger()
    {
        //Console.Write("p");
        start.OnNext(true);
    }

    void Explode()
    {
        Console.WriteLine("...BOOM!");
    }

    public void Dispose()
    {
        foreach (var h in handles)
            h.Dispose();
    }
}

Here's the console output:

0 ...BOOM!
1000 ...BOOM!
2000 ...BOOM!
3000 ...BOOM!
4000 ...BOOM!
5000 ...BOOM!
6000 ...BOOM!
7000 ...BOOM!
8000 ...BOOM!
9000 ...BOOM!
10000 ...BOOM!
11000 ...BOOM!
12000
Process is terminated due to StackOverflowException.
+4
source share
1 answer

. CurrentThread.

.ObserveOn(Scheduler.CurrentThread)

CurrentThreadScheduler [...] , , . , .

0

Source: https://habr.com/ru/post/1688033/


All Articles