C # Wait for all threads to complete in ThreadPool

I have one main thread and many other background threads.

The main use of these background threads is to request data (there are a lot of requests from the Internet, so I create several threads: to avoid lagging the user interface).

When it comes to exporting data to the main thread (user interface), I need to wait for the rest of the threads to finish.

My code is:

//...code to open save file dialog... //this loop is to wait for all the threads finish their query //QueryThread.threadCount is the count of the background threads while (QueryThread.threadCount != 0) { Thread.CurrentThread.Join(1000); Console.WriteLine(QueryThread.threadCount); } //...code to export data... 

If I comment on the while loop, the program will run smoothly, but some of my exported data will be able to display some β€œunwanted” materials, as some of the background threads have not yet completed their work.

However, the while loop described above is infinite, threadCount never changes, which means that during the "Join ()" method the background thread does not work.

Why are background threads blocked and how can I solve the problem?

Thanks a lot!

+4
source share
5 answers

Your implementation is incorrect, and you should not use Join as a synchronization primitive between your threads.

What you need to do is implement the producer-consumer pattern. This will allow you to have threads waiting to complete the work, and then come to life to do this work when you put it in the queue.

However, the change I will make is that from the user interface thread, do not add data directly to the queue owned by the producer and consumer. Rather, make a copy of this data, and then queue it.

For more information on how to implement the producer-consumer pattern in .NET, I suggest you familiarize yourself with the MSDN documentation entitled " How to Synchronize the Manufacturer and Consumer Thread (C # Programming Guide) "

+2
source

You call the Join method on the current thread, which does not make much sense. You should call it worker threads:

 foreach (Thread thread in workerThreads) { thread.Join(1000); } 

Unfortunately, this view strikes the purpose of using threads, as it blocks the call until all other threads are completed.

The RunWorkerCompleted BackgroundWorker event can be used to notify you that a background task has completed and updates are performed on the form.

+3
source

I think you want to look at the alarm. Have signals (ManualResetEvent / AutoResetEvent) for your streams. Set () the associated sign of the signal in the workflow when this is done. In the main thread, run `WaitAll (signal1, signal2, signal3) 'to wait for your workflows to complete.

Hope this helps,

+2
source

I could not try myself. I am sure there is room for improvement, but I think this shows how to deal with some issues with multiple threads, including the original question.

Form.cs

 namespace STAFormWithThreadPoolSync { internal delegate void WorkerEvent(WorkerEventInfo info); public partial class Form1 : Form { // We'll create a state object for each worker process List<WorkerState> workerStates = new List<WorkerState>(); public Form1() { InitializeComponent(); } // Executed in the main thread private void button1_Click(object sender, EventArgs e) { workersList.Items.Clear(); // Read the amount of thread we should start from the form int threadCountToUse = (int)ThreadCount.Value; WorkerEvent woEvent = new WorkerEvent(this.workerEventOccured); // Start up all threads for (int counter = 0; counter < threadCountToUse; ++counter) { // An object we can pass values into for the worker process to use. WorkerState workerState = new WorkerState(); workerState.OnStarted += woEvent; workerState.OnFinished += woEvent; // Register for the signal (and store its registered wait handle in the stateObj, which we also pass into the parameters!) workerState.registeredWaitHandle = ThreadPool.RegisterWaitForSingleObject(workerState.finishSignal, this.ItemHasFinished, workerState, -1, true); // Store the state object for later use. workerStates.Add(workerState); } WorkersProgress.Minimum = 0; WorkersProgress.Maximum = workerStates.Count; workerStates.ForEach(workerState => { // Fire of the worker thread (with the state object) ThreadPool.QueueUserWorkItem(this.ProcessItem, workerState); } ); button1.Enabled = false; CurrentResult.Value = 0; CurrentResultLabel.Text = "Current value"; ProgressTimer.Start(); } // event is run on the callers thread, so carefull accessing our controls on our form. internal void workerEventOccured(WorkerEventInfo info) { if (this.workersList.InvokeRequired) { WorkerEvent workerEvent = new WorkerEvent(workerEventOccured); this.Invoke(workerEvent, new object[] { info }); } else { switch (info.eventType) { case EventType.WorkerStarted: this.workersList.Items.Add(String.Format("Worker started on thread : {0}", info.workerState.threadId)); break; case EventType.WorkerEnded: this.workersList.Items.Add(String.Format("Worker finished on thread : {0}", info.workerState.threadId)); break; case EventType.AllWorkersFinished: this.workersList.Items.Add("ALL workers finished"); ProgressTimer.Stop(); button1.Enabled = true; CurrentResultLabel.Text = "Final value"; break; } } } // Executed in threadpool thread. private void ProcessItem(object state) { WorkerState workerState = state as WorkerState; int threadId = Thread.CurrentThread.ManagedThreadId; workerState.threadId = threadId.ToString(); WorkerEventInfo weInfo = new WorkerEventInfo(); weInfo.eventType = EventType.WorkerStarted; weInfo.workerState = workerState; workerState.Started(weInfo); // Simulate work for ((threadid / 2) seconds. Thread.Sleep((threadId * 500)); // Set the result in the state object to the threadId; workerState.result = threadId; // Signal that this thread is done. workerState.finishSignal.Set(); } // Executed in threadpool thread private void ItemHasFinished(object state, bool timedOut) { // get our state object WorkerState workerState = state as WorkerState; WorkerEventInfo weInfo = new WorkerEventInfo(); weInfo.eventType = EventType.WorkerEnded; weInfo.workerState = workerState; workerState.Finished(weInfo); } private void ProgressTimer_Tick(object sender, EventArgs e) { List<WorkerState> removeStates = new List<WorkerState>(); workerStates.ForEach(workerState => { if (workerState.finishSignal.WaitOne(0)) { CurrentResult.Value += workerState.result; removeStates.Add(workerState); } } ); removeStates.ForEach(workerState => { workerState.registeredWaitHandle.Unregister(workerState.finishSignal); workerStates.Remove(workerState); } ); WorkersProgress.Value = workerStates.Count; if (workerStates.Count == 0) { WorkerEventInfo weInfo = new WorkerEventInfo(); weInfo.eventType = EventType.AllWorkersFinished; weInfo.workerState = null; this.workerEventOccured(weInfo); } } } internal class WorkerState { internal string threadId = ""; internal int result = 0; internal RegisteredWaitHandle registeredWaitHandle = null; internal AutoResetEvent finishSignal = new AutoResetEvent(false); internal event WorkerEvent OnStarted = new WorkerEvent( (info) => {}); internal event WorkerEvent OnFinished = new WorkerEvent((info) => { }); internal void Started(WorkerEventInfo info) { OnStarted(info); } internal void Finished(WorkerEventInfo info) { OnFinished(info); this.finishSignal.Set(); } } internal enum EventType { WorkerStarted, WorkerEnded, AllWorkersFinished } internal class WorkerEventInfo { internal EventType eventType; internal WorkerState workerState; } }
namespace STAFormWithThreadPoolSync { internal delegate void WorkerEvent(WorkerEventInfo info); public partial class Form1 : Form { // We'll create a state object for each worker process List<WorkerState> workerStates = new List<WorkerState>(); public Form1() { InitializeComponent(); } // Executed in the main thread private void button1_Click(object sender, EventArgs e) { workersList.Items.Clear(); // Read the amount of thread we should start from the form int threadCountToUse = (int)ThreadCount.Value; WorkerEvent woEvent = new WorkerEvent(this.workerEventOccured); // Start up all threads for (int counter = 0; counter < threadCountToUse; ++counter) { // An object we can pass values into for the worker process to use. WorkerState workerState = new WorkerState(); workerState.OnStarted += woEvent; workerState.OnFinished += woEvent; // Register for the signal (and store its registered wait handle in the stateObj, which we also pass into the parameters!) workerState.registeredWaitHandle = ThreadPool.RegisterWaitForSingleObject(workerState.finishSignal, this.ItemHasFinished, workerState, -1, true); // Store the state object for later use. workerStates.Add(workerState); } WorkersProgress.Minimum = 0; WorkersProgress.Maximum = workerStates.Count; workerStates.ForEach(workerState => { // Fire of the worker thread (with the state object) ThreadPool.QueueUserWorkItem(this.ProcessItem, workerState); } ); button1.Enabled = false; CurrentResult.Value = 0; CurrentResultLabel.Text = "Current value"; ProgressTimer.Start(); } // event is run on the callers thread, so carefull accessing our controls on our form. internal void workerEventOccured(WorkerEventInfo info) { if (this.workersList.InvokeRequired) { WorkerEvent workerEvent = new WorkerEvent(workerEventOccured); this.Invoke(workerEvent, new object[] { info }); } else { switch (info.eventType) { case EventType.WorkerStarted: this.workersList.Items.Add(String.Format("Worker started on thread : {0}", info.workerState.threadId)); break; case EventType.WorkerEnded: this.workersList.Items.Add(String.Format("Worker finished on thread : {0}", info.workerState.threadId)); break; case EventType.AllWorkersFinished: this.workersList.Items.Add("ALL workers finished"); ProgressTimer.Stop(); button1.Enabled = true; CurrentResultLabel.Text = "Final value"; break; } } } // Executed in threadpool thread. private void ProcessItem(object state) { WorkerState workerState = state as WorkerState; int threadId = Thread.CurrentThread.ManagedThreadId; workerState.threadId = threadId.ToString(); WorkerEventInfo weInfo = new WorkerEventInfo(); weInfo.eventType = EventType.WorkerStarted; weInfo.workerState = workerState; workerState.Started(weInfo); // Simulate work for ((threadid / 2) seconds. Thread.Sleep((threadId * 500)); // Set the result in the state object to the threadId; workerState.result = threadId; // Signal that this thread is done. workerState.finishSignal.Set(); } // Executed in threadpool thread private void ItemHasFinished(object state, bool timedOut) { // get our state object WorkerState workerState = state as WorkerState; WorkerEventInfo weInfo = new WorkerEventInfo(); weInfo.eventType = EventType.WorkerEnded; weInfo.workerState = workerState; workerState.Finished(weInfo); } private void ProgressTimer_Tick(object sender, EventArgs e) { List<WorkerState> removeStates = new List<WorkerState>(); workerStates.ForEach(workerState => { if (workerState.finishSignal.WaitOne(0)) { CurrentResult.Value += workerState.result; removeStates.Add(workerState); } } ); removeStates.ForEach(workerState => { workerState.registeredWaitHandle.Unregister(workerState.finishSignal); workerStates.Remove(workerState); } ); WorkersProgress.Value = workerStates.Count; if (workerStates.Count == 0) { WorkerEventInfo weInfo = new WorkerEventInfo(); weInfo.eventType = EventType.AllWorkersFinished; weInfo.workerState = null; this.workerEventOccured(weInfo); } } } internal class WorkerState { internal string threadId = ""; internal int result = 0; internal RegisteredWaitHandle registeredWaitHandle = null; internal AutoResetEvent finishSignal = new AutoResetEvent(false); internal event WorkerEvent OnStarted = new WorkerEvent( (info) => {}); internal event WorkerEvent OnFinished = new WorkerEvent((info) => { }); internal void Started(WorkerEventInfo info) { OnStarted(info); } internal void Finished(WorkerEventInfo info) { OnFinished(info); this.finishSignal.Set(); } } internal enum EventType { WorkerStarted, WorkerEnded, AllWorkersFinished } internal class WorkerEventInfo { internal EventType eventType; internal WorkerState workerState; } } 

Form.Designer.cs

 namespace STAFormWithThreadPoolSync { partial class Form1 { /// /// Required designer variable. /// private System.ComponentModel.IContainer components = null; /// /// Clean up any resources being used. /// /// true if managed resources should be disposed; otherwise, false. protected override void Dispose(bool disposing) { if (disposing && (components != null)) { components.Dispose(); } base.Dispose(disposing); } #region Windows Form Designer generated code /// /// Required method for Designer support - do not modify /// the contents of this method with the code editor. /// private void InitializeComponent() { this.components = new System.ComponentModel.Container(); this.button1 = new System.Windows.Forms.Button(); this.ThreadCount = new System.Windows.Forms.NumericUpDown(); this.workersList = new System.Windows.Forms.ListView(); this.WorkerProcessColumn = new System.Windows.Forms.ColumnHeader(); this.ProgressTimer = new System.Windows.Forms.Timer(this.components); this.WorkersProgress = new System.Windows.Forms.ProgressBar(); this.CurrentResultLabel = new System.Windows.Forms.Label(); this.CurrentResult = new System.Windows.Forms.NumericUpDown(); this.label2 = new System.Windows.Forms.Label(); ((System.ComponentModel.ISupportInitialize)(this.ThreadCount)).BeginInit(); ((System.ComponentModel.ISupportInitialize)(this.CurrentResult)).BeginInit(); this.SuspendLayout(); // // button1 // this.button1.Location = new System.Drawing.Point(212, 19); this.button1.Name = "button1"; this.button1.Size = new System.Drawing.Size(93, 23); this.button1.TabIndex = 0; this.button1.Text = "Start threads"; this.button1.UseVisualStyleBackColor = true; this.button1.Click += new System.EventHandler(this.button1_Click); // // ThreadCount // this.ThreadCount.Location = new System.Drawing.Point(23, 21); this.ThreadCount.Minimum = new decimal(new int[] { 2, 0, 0, 0}); this.ThreadCount.Name = "ThreadCount"; this.ThreadCount.Size = new System.Drawing.Size(183, 20); this.ThreadCount.TabIndex = 1; this.ThreadCount.Value = new decimal(new int[] { 4, 0, 0, 0}); // // workersList // this.workersList.Columns.AddRange(new System.Windows.Forms.ColumnHeader[] { this.WorkerProcessColumn}); this.workersList.Location = new System.Drawing.Point(23, 80); this.workersList.Name = "workersList"; this.workersList.Size = new System.Drawing.Size(486, 255); this.workersList.TabIndex = 3; this.workersList.UseCompatibleStateImageBehavior = false; this.workersList.View = System.Windows.Forms.View.Details; // // WorkerProcessColumn // this.WorkerProcessColumn.Text = "Worker process"; this.WorkerProcessColumn.Width = 482; // // ProgressTimer // this.ProgressTimer.Interval = 200; this.ProgressTimer.Tick += new System.EventHandler(this.ProgressTimer_Tick); // // WorkersProgress // this.WorkersProgress.Location = new System.Drawing.Point(112, 341); this.WorkersProgress.Name = "WorkersProgress"; this.WorkersProgress.Size = new System.Drawing.Size(397, 24); this.WorkersProgress.TabIndex = 4; // // CurrentResultLabel // this.CurrentResultLabel.AutoSize = true; this.CurrentResultLabel.Location = new System.Drawing.Point(578, 266); this.CurrentResultLabel.Name = "CurrentResultLabel"; this.CurrentResultLabel.Size = new System.Drawing.Size(74, 13); this.CurrentResultLabel.TabIndex = 5; this.CurrentResultLabel.Text = "Current Result"; // // CurrentResult // this.CurrentResult.Location = new System.Drawing.Point(581, 282); this.CurrentResult.Maximum = new decimal(new int[] { -1593835520, 466537709, 54210, 0}); this.CurrentResult.Name = "CurrentResult"; this.CurrentResult.ReadOnly = true; this.CurrentResult.Size = new System.Drawing.Size(169, 20); this.CurrentResult.TabIndex = 6; // // label2 // this.label2.AutoSize = true; this.label2.Location = new System.Drawing.Point(25, 352); this.label2.Name = "label2"; this.label2.Size = new System.Drawing.Size(81, 13); this.label2.TabIndex = 7; this.label2.Text = "processing load"; // // Form1 // this.AutoScaleDimensions = new System.Drawing.SizeF(6F, 13F); this.AutoScaleMode = System.Windows.Forms.AutoScaleMode.Font; this.ClientSize = new System.Drawing.Size(762, 377); this.Controls.Add(this.label2); this.Controls.Add(this.CurrentResult); this.Controls.Add(this.CurrentResultLabel); this.Controls.Add(this.WorkersProgress); this.Controls.Add(this.workersList); this.Controls.Add(this.ThreadCount); this.Controls.Add(this.button1); this.Name = "Form1"; this.Text = "Form1"; ((System.ComponentModel.ISupportInitialize)(this.ThreadCount)).EndInit(); ((System.ComponentModel.ISupportInitialize)(this.CurrentResult)).EndInit(); this.ResumeLayout(false); this.PerformLayout(); } #endregion private System.Windows.Forms.Button button1; private System.Windows.Forms.NumericUpDown ThreadCount; private System.Windows.Forms.ListView workersList; private System.Windows.Forms.ColumnHeader WorkerProcessColumn; private System.Windows.Forms.Timer ProgressTimer; private System.Windows.Forms.ProgressBar WorkersProgress; private System.Windows.Forms.Label CurrentResultLabel; private System.Windows.Forms.NumericUpDown CurrentResult; private System.Windows.Forms.Label label2; } }
namespace STAFormWithThreadPoolSync { partial class Form1 { /// /// Required designer variable. /// private System.ComponentModel.IContainer components = null; /// /// Clean up any resources being used. /// /// true if managed resources should be disposed; otherwise, false. protected override void Dispose(bool disposing) { if (disposing && (components != null)) { components.Dispose(); } base.Dispose(disposing); } #region Windows Form Designer generated code /// /// Required method for Designer support - do not modify /// the contents of this method with the code editor. /// private void InitializeComponent() { this.components = new System.ComponentModel.Container(); this.button1 = new System.Windows.Forms.Button(); this.ThreadCount = new System.Windows.Forms.NumericUpDown(); this.workersList = new System.Windows.Forms.ListView(); this.WorkerProcessColumn = new System.Windows.Forms.ColumnHeader(); this.ProgressTimer = new System.Windows.Forms.Timer(this.components); this.WorkersProgress = new System.Windows.Forms.ProgressBar(); this.CurrentResultLabel = new System.Windows.Forms.Label(); this.CurrentResult = new System.Windows.Forms.NumericUpDown(); this.label2 = new System.Windows.Forms.Label(); ((System.ComponentModel.ISupportInitialize)(this.ThreadCount)).BeginInit(); ((System.ComponentModel.ISupportInitialize)(this.CurrentResult)).BeginInit(); this.SuspendLayout(); // // button1 // this.button1.Location = new System.Drawing.Point(212, 19); this.button1.Name = "button1"; this.button1.Size = new System.Drawing.Size(93, 23); this.button1.TabIndex = 0; this.button1.Text = "Start threads"; this.button1.UseVisualStyleBackColor = true; this.button1.Click += new System.EventHandler(this.button1_Click); // // ThreadCount // this.ThreadCount.Location = new System.Drawing.Point(23, 21); this.ThreadCount.Minimum = new decimal(new int[] { 2, 0, 0, 0}); this.ThreadCount.Name = "ThreadCount"; this.ThreadCount.Size = new System.Drawing.Size(183, 20); this.ThreadCount.TabIndex = 1; this.ThreadCount.Value = new decimal(new int[] { 4, 0, 0, 0}); // // workersList // this.workersList.Columns.AddRange(new System.Windows.Forms.ColumnHeader[] { this.WorkerProcessColumn}); this.workersList.Location = new System.Drawing.Point(23, 80); this.workersList.Name = "workersList"; this.workersList.Size = new System.Drawing.Size(486, 255); this.workersList.TabIndex = 3; this.workersList.UseCompatibleStateImageBehavior = false; this.workersList.View = System.Windows.Forms.View.Details; // // WorkerProcessColumn // this.WorkerProcessColumn.Text = "Worker process"; this.WorkerProcessColumn.Width = 482; // // ProgressTimer // this.ProgressTimer.Interval = 200; this.ProgressTimer.Tick += new System.EventHandler(this.ProgressTimer_Tick); // // WorkersProgress // this.WorkersProgress.Location = new System.Drawing.Point(112, 341); this.WorkersProgress.Name = "WorkersProgress"; this.WorkersProgress.Size = new System.Drawing.Size(397, 24); this.WorkersProgress.TabIndex = 4; // // CurrentResultLabel // this.CurrentResultLabel.AutoSize = true; this.CurrentResultLabel.Location = new System.Drawing.Point(578, 266); this.CurrentResultLabel.Name = "CurrentResultLabel"; this.CurrentResultLabel.Size = new System.Drawing.Size(74, 13); this.CurrentResultLabel.TabIndex = 5; this.CurrentResultLabel.Text = "Current Result"; // // CurrentResult // this.CurrentResult.Location = new System.Drawing.Point(581, 282); this.CurrentResult.Maximum = new decimal(new int[] { -1593835520, 466537709, 54210, 0}); this.CurrentResult.Name = "CurrentResult"; this.CurrentResult.ReadOnly = true; this.CurrentResult.Size = new System.Drawing.Size(169, 20); this.CurrentResult.TabIndex = 6; // // label2 // this.label2.AutoSize = true; this.label2.Location = new System.Drawing.Point(25, 352); this.label2.Name = "label2"; this.label2.Size = new System.Drawing.Size(81, 13); this.label2.TabIndex = 7; this.label2.Text = "processing load"; // // Form1 // this.AutoScaleDimensions = new System.Drawing.SizeF(6F, 13F); this.AutoScaleMode = System.Windows.Forms.AutoScaleMode.Font; this.ClientSize = new System.Drawing.Size(762, 377); this.Controls.Add(this.label2); this.Controls.Add(this.CurrentResult); this.Controls.Add(this.CurrentResultLabel); this.Controls.Add(this.WorkersProgress); this.Controls.Add(this.workersList); this.Controls.Add(this.ThreadCount); this.Controls.Add(this.button1); this.Name = "Form1"; this.Text = "Form1"; ((System.ComponentModel.ISupportInitialize)(this.ThreadCount)).EndInit(); ((System.ComponentModel.ISupportInitialize)(this.CurrentResult)).EndInit(); this.ResumeLayout(false); this.PerformLayout(); } #endregion private System.Windows.Forms.Button button1; private System.Windows.Forms.NumericUpDown ThreadCount; private System.Windows.Forms.ListView workersList; private System.Windows.Forms.ColumnHeader WorkerProcessColumn; private System.Windows.Forms.Timer ProgressTimer; private System.Windows.Forms.ProgressBar WorkersProgress; private System.Windows.Forms.Label CurrentResultLabel; private System.Windows.Forms.NumericUpDown CurrentResult; private System.Windows.Forms.Label label2; } } 

Hope this helps,

+1
source

I solved the problem by changing my approach to the Producer-Consumer model.

Thanks to everyone. Look at the link (provided by casperOne above), but be careful not to follow the microsoft implementation ....

Go here instead will give you a better answer.

Of course, I made some changes, the type of queue is to delegate in my case.

 public static class QueryThread { private static SyncEvents _syncEvents = new SyncEvents(); private static Queue<Delegate> _queryQueue = new Queue<Delegate>(); static Producer queryProducer; static Consumer queryConsumer; public static void init() { queryProducer = new Producer(_queryQueue, _syncEvents); queryConsumer = new Consumer(_queryQueue, _syncEvents); Thread producerThread = new Thread(queryProducer.ThreadRun); Thread consumerThread = new Thread(queryConsumer.ThreadRun); producerThread.IsBackground = true; consumerThread.IsBackground = true; producerThread.Start(); consumerThread.Start(); } public static void Enqueue(Delegate item) { queryQueue.Enqueue(item); } } 

When a request is needed in the main thread, the enqueue delegate points to a function that makes the request by calling Enqueue (the Delegate element). This will add the delegate to the Producer's "private" queue.

The manufacturer will add the elements to the general queue in the appropriate case (for example, generate a random number and put it in the general queue in the msdn example).

The consumer deactivates the delegates and starts them.

Thank you all for your help. =]

0
source

Source: https://habr.com/ru/post/1301668/


All Articles