Processing requests at maximum speed

I use Rx to ensure that our backend is subject to some third-party API restrictions.

The implementation below uses a simple Subject<T>as an input queue, which is then tamed using the custom operator Pace James World .

This works, BUT only until it throttledRequestsis observed in the main thread that is being applied ObserveOn(TaskPoolScheduler.Default).

As soon as I comment on this line (line 61), the program behaves as if the operator Pacewas not used at all, and the request is processed again as quickly as in the queue. Can anyone explain this behavior?

using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    public static class ObservableExtensions
    {
        /// <summary>
        /// James World Pace operater (see /questions/1184340/process-rx-events-at-fixed-or-minimum-intervals/3874779#3874779)
        /// </summary>
        public static IObservable<T> Pace<T>(this IObservable<T> source, TimeSpan interval)
        {
            return source.Select(i => Observable.Empty<T>()
                    .Delay(interval)
                    .StartWith(i))
                .Concat();
        }
    }

    class Program
    {
        ISubject<int> requests;
        IObservable<int> throttledRequests;

        private Task<T> QueueRequest<T>(int work, Func<int, Task<T>> doWork)
        {
            var task = throttledRequests
                .Where(x => x == work)
                .Take(1)
                .SelectMany(doWork)
                .ToTask();

            // queue it
            requests.OnNext(work);

            return task;
        }

        private Task<int> DoRequest(int x)
        {
            Console.WriteLine("{0:T}: DoRequest({1}) on TID {2}", DateTime.UtcNow, x, Thread.CurrentThread.ManagedThreadId);
            return Task.FromResult(x);
        }

        private void Run()
        {
            // initialize request queue
            requests = new Subject<int>();

            // create a derived rate-limited queue
            throttledRequests = requests
                .Pace(TimeSpan.FromMilliseconds(1000))
                .Publish()
                .RefCount()
                .ObserveOn(TaskPoolScheduler.Default);

            Console.WriteLine("Main TID: {0}", Thread.CurrentThread.ManagedThreadId);

            int i = 0;

            while (true)
            {
                // Queue a number of requests
                var tasks = Enumerable.Range(i * 10, 10)
                    .Select(x => QueueRequest(x, DoRequest))
                    .ToArray();

                Task.WaitAll(tasks);

                Console.ReadLine();
                i++;
            }
        }

        static void Main(string[] args)
        {
            new Program().Run();
        }
    }
}
+4
1

( , , ThreadPoolScheduler), , , , ThreadPoolScheduler .

, ThreadPoolScheduler - 1-3 - . , . . :

var result = Observable.Range(0, 10).Delay(TimeSpan.FromSeconds(10)).StartWith(1).Take(1).ToTask().Result;

, . ? StartWith "1" , Take (1) - , . Take (2) - 10 .

( , , " " , , ). , Take (1) ( Take (2)), - . , ThreadPoolScheduler.

+2

Source: https://habr.com/ru/post/1608284/


All Articles