Limiting concurrent queries using Rx and SelectMany

I have a list of URLs that I want to download at the same time using HttpClient . The list of URLs can be large (100 or more!)

I have this code:

 var urls = new List<string> { @"http:\\www.amazon.com", @"http:\\www.bing.com", @"http:\\www.facebook.com", @"http:\\www.twitter.com", @"http:\\www.google.com" }; var client = new HttpClient(); var contents = urls .ToObservable() .SelectMany(uri => client.GetStringAsync(new Uri(uri, UriKind.Absolute))); contents.Subscribe(Console.WriteLine); 

Problem: SelectMany to the use of SelectMany , a large task task is created almost simultaneously. It seems that if the list of URLs is large enough, many tasks set timeouts (I get "Job canceled" ).

So, I thought that there should be a way, perhaps use some kind of Scheduler to limit the number of simultaneous Jobs, not allowing more than 5 or 6 at a given time.

That way, I could receive files that were simultaneously downloaded without running too many tasks that could cause a crash, as of now.

How to do this so that I do not get saturated with a lot of timeouts?

Many thanks.

+3
source share
3 answers

Remember SelectMany() is actually Select().Merge() . So far, SelectMany has no maxConcurrent parameter, Merge() . Therefore, you can use this.

In your example, you can do this:

 var urls = new List<string> { @"http:\\www.amazon.com", @"http:\\www.bing.com", @"http:\\www.facebook.com", @"http:\\www.twitter.com", @"http:\\www.google.com" }; var client = new HttpClient(); var contents = urls .ToObservable() .Select(uri => Observable.FromAsync(() => client.GetStringAsync(uri))) .Merge(2); // 2 maximum concurrent requests! contents.Subscribe(Console.WriteLine); 
+11
source

Here is an example of how you can do this using the DataFlow API :

 private static Task DoIt() { var urls = new List<string> { @"http:\\www.amazon.com", @"http:\\www.bing.com", @"http:\\www.facebook.com", @"http:\\www.twitter.com", @"http:\\www.google.com" }; var client = new HttpClient(); //Create a block that takes a URL as input //and produces the download result as output TransformBlock<string,string> downloadBlock = new TransformBlock<string, string>( uri => client.GetStringAsync(new Uri(uri, UriKind.Absolute)), new ExecutionDataflowBlockOptions { //At most 2 download operation execute at the same time MaxDegreeOfParallelism = 2 }); //Create a block that prints out the result ActionBlock<string> doneBlock = new ActionBlock<string>(x => Console.WriteLine(x)); //Link the output of the first block to the input of the second one downloadBlock.LinkTo( doneBlock, new DataflowLinkOptions { PropagateCompletion = true}); //input the urls into the first block foreach (var url in urls) { downloadBlock.Post(url); } downloadBlock.Complete(); //Mark completion of input //Allows consumer to wait for the whole operation to complete return doneBlock.Completion; } static void Main(string[] args) { DoIt().Wait(); Console.WriteLine("Done"); Console.ReadLine(); } 
+2
source

Do you see if this helps?

 var urls = new List<string> { @"http:\\www.amazon.com", @"http:\\www.bing.com", @"http:\\www.google.com", @"http:\\www.twitter.com", @"http:\\www.google.com" }; var contents = urls .ToObservable() .SelectMany(uri => Observable .Using( () => new System.Net.Http.HttpClient(), client => client .GetStringAsync(new Uri(uri, UriKind.Absolute)) .ToObservable())); 
+1
source

Source: https://habr.com/ru/post/1203673/


All Articles