How to transfer different instances during multithreading?

I am building a scraper. My goal is to launch X-browsers (where X is the number of threads) and start cleaning the list of URLs with each of them by splitting this list into X-parts.

I decided to use 3 streams (3 browsers) with a list of 10 URLs.

Question. How to split each task between browsers as follows:

  • Browser1 resets items in the list from 0 to 3

  • Browser2 resets items in a list of 4 to 7

  • Browser3 resets items in the list from 8 to 10

All browsers should work simultaneously with clearing the submitted list of URLs.

I already have this BlockingCollection :

 BlockingCollection<Action> _taskQ = new BlockingCollection<Action>(); public Multithreading(int workerCount) { // Create and start a separate Task for each consumer: for (int i = 0; i < workerCount; i++) Task.Factory.StartNew(Consume); } public void Dispose() { _taskQ.CompleteAdding(); } public void EnqueueTask(Action action) { _taskQ.Add(action); } void Consume() { // This sequence that we're enumerating will block when no elements // are available and will end when CompleteAdding is called. foreach (Action action in _taskQ.GetConsumingEnumerable()) action(); // Perform task. } public int ItemsCount() { return _taskQ.Count; } 

It can be used as follows:

 Multithreading multithread = new Multithreading(3); //3 threads foreach(string url in urlList){ multithread.EnqueueTask(new Action(() => { startScraping(browser1); //or browser2 or browser3 })); } 

I need to create browser instances before , because I do not want to launch a new browser with each thread.

+5
source share
4 answers

Given Henk Holtermans, you can count on maximum speed, i.e. maximize browser traffic using this:

 private static void StartScraping(int id, IEnumerable<Uri> urls) { // Construct browser here foreach (Uri url in urls) { // Use browser to process url here Console.WriteLine("Browser {0} is processing url {1}", id, url); } } 

primarily:

  int nrWorkers = 3; int nrUrls = 10; BlockingCollection<Uri> taskQ = new BlockingCollection<Uri>(); foreach (int i in Enumerable.Range(0, nrWorkers)) { Task.Run(() => StartScraping(i, taskQ.GetConsumingEnumerable())); } foreach (int i in Enumerable.Range(0, nrUrls)) { taskQ.Add(new Uri(String.Format("http://Url{0}", i))); } taskQ.CompleteAdding(); 
+1
source

I believe the usual approach is to have a single blocking queue, a supplier flow, and an arbitrary pool of workers.

The provider thread is responsible for adding the URLs to the queue. It blocks when they are not there.

The workflow creates an instance of the browser and then retrieves one URL from the queue, resets it, and then returns back. It is blocked when the queue is empty.

You can run as many workers as you want, and they simply understand between them.

The main line starts all flows and goes to the side. It tracks the user interface, if any.

Multithreading can be very difficult to debug. You might want to look at using Tasks for at least part of the task.

+1
source

You can give some Id tasks as well as workers. Then you will have BlockingCollection[] instead of BlockingCollection . Each consumer will consume from their own BlockingCollection from the array. Our task is to find the right consumer and publish the work.

 BlockingCollection<Action>[] _taskQ; private int taskCounter = -1; public Multithreading(int workerCount) { _taskQ = new BlockingCollection<Action>[workerCount]; for (int i = 0; i < workerCount; i++) { int workerId = i;//To avoid closure issue _taskQ[workerId] = new BlockingCollection<Action>(); Task.Factory.StartNew(()=> Consume(workerId)); } } public void EnqueueTask(Action action) { int value = Interlocked.Increment(ref taskCounter); int index = value / 4;//Your own logic to find the index here _taskQ[index].Add(action); } void Consume(int workerId) { foreach (Action action in _taskQ[workerId].GetConsumingEnumerable()) action();// Perform task. } 
0
source

A simple solution using background workers can limit the number of threads:

 public class Scraper : IDisposable { private readonly BlockingCollection<Action> tasks; private readonly IList<BackgroundWorker> workers; public Scraper(IList<Uri> urls, int numberOfThreads) { for (var i = 0; i < urls.Count; i++) { var url = urls[i]; tasks.Add(() => Scrape(url)); } for (var i = 0; i < numberOfThreads; i++) { var worker = new BackgroundWorker(); worker.DoWork += (sender, args) => { Action task; while (tasks.TryTake(out task)) { task(); } }; workers.Add(worker); worker.RunWorkerAsync(); } } public void Scrape(Uri url) { Console.WriteLine("Scraping url {0}", url); } public void Dispose() { throw new NotImplementedException(); } } 
0
source

Source: https://habr.com/ru/post/1200526/


All Articles