I am trying to associate a parallel task with a listView that contains pprogressBars. I use a limited scheduler that allows only the specified maximum parallel degree. Until now, it has been working fine most of the time, but sometimes two tasks update the same progress bar in the View list. Below is my code.
Any idea how to prevent two tasks from updating from the same performance level in the View list? Or how to update the progress bar from tasks running at the same time?
public class MyClass
{
public ObservableCollection<StatusInfo> StatusItems { get; set; }
private Object thisLock = new Object();
public int Process()
{
StatusItems = new ObservableCollection<StatusInfo>();
for (int i = 0; i < 4; i++)
{
StatusInfo sInfo = new StatusInfo();
sInfo.ThreadID = i;
sInfo.Table = "No Table";
sInfo.Percentage = 0;
sInfo.Status = AppInfo.AVAILABLE;
sInfo.Minimum = 0;
sInfo.Maximum = 100;
sInfo.Visibility = Visibility.Visible;
StatusItems.Add(sInfo);
}
Parent.StatusItems = StatusItems;
int numberOfBackGroundThread = 4;
LimitedTaskScheduler scheduler = new LimitedTaskScheduler(numberOfBackGroundThread);
TaskFactory factory = new TaskFactory(scheduler);
var ui = LimitedTaskScheduler.FromCurrentSynchronizationContext();
Task[] tasks = new Task[rows.Count];
for (int i = 0; i < rows.Count; i++)
{
......
tasks[i] = factory.StartNew<string>(() =>
{
int barIndex = -1;
int threadID = Thread.CurrentThread.ManagedThreadId;
cnt++;
if (cnt > numberOfBackGroundThread - 1)
{
while (true)
{
for (int j = 0; j < numberOfBackGroundThread; j++)
{
lock (thisLock)
{
if (StatusItems[j].Status == "AVAILABLE" || StatusItems[j].Status == "DONE")
{
StatusItems[j].Status = "Occupied";
barIndex = j;
break;
}
}
}
if (barIndex >= 0) { break; }
}
}
else { barIndex = cnt; }
StatusItems[barIndex].TabType = tabType;
StatusItems[barIndex].ThreadID = threadID;
int nStatus = IndividualProcess(barIndex);
if (nStatus < 0) { AppInfo.JobStatus = "01"; }
return result;
});
}
var done = factory.ContinueWhenAll(tasks, completedTasks => { AppInfo.Finished = true; });
done.ContinueWith(completedTasks => { int nStatus = PostProcess(); }, ui);
return returnStatus;
}
private int IndividualProcess(int barIndex)
{
for (int i=0; i< 100; i++)
{
perform work...
SetProgressbar (i, StatusItems, barIndex, "in progress")
}
SetProgressbar (100, StatusItems, barIndex, "DONE")
}
public void SetProgressbar(int pPercent, ObservableCollection<StatusInfo> pInfo, int pInfoIndex, string pStatus)
{
try
{
if (Application.Current.Dispatcher.Thread != System.Threading.Thread.CurrentThread)
{
Application.Current.Dispatcher.BeginInvoke(new Action(() =>
{
((StatusInfo)pInfo[pInfoIndex]).Percentage = pPercent;
((StatusInfo)pInfo[pInfoIndex]).Status = pStatus;
((StatusInfo)pInfo[pInfoIndex]).PCT = pPercent.ToString() + "%";
}));
}
else
{
((StatusInfo)pInfo[pInfoIndex]).Percentage = pPercent;
((StatusInfo)pInfo[pInfoIndex]).Status = pStatus;
((StatusInfo)pInfo[pInfoIndex]).PCT = pPercent.ToString() + "%";
}
}
catch { throw; }
}
}
public class LimitedTaskScheduler : TaskScheduler
{
[ThreadStatic]
private static bool _currentThreadIsProcessingItems;
private readonly LinkedList<Task> _tasks = new LinkedList<Task>();
private readonly int _maxDegreeOfParallelism;
private int _delegatesQueuedOrRunning = 0;
public LimitedTaskScheduler(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
_maxDegreeOfParallelism = maxDegreeOfParallelism;
}
protected sealed override void QueueTask(Task task)
{
lock (_tasks)
{
_tasks.AddLast(task);
if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
{
++_delegatesQueuedOrRunning;
NotifyThreadPoolOfPendingWork();
}
}
}
private void NotifyThreadPoolOfPendingWork()
{
ThreadPool.UnsafeQueueUserWorkItem(_ =>
{
_currentThreadIsProcessingItems = true;
try
{
while (true)
{
Task item;
lock (_tasks)
{
if (_tasks.Count == 0)
{
--_delegatesQueuedOrRunning;
break;
}
item = _tasks.First.Value;
_tasks.RemoveFirst();
}
base.TryExecuteTask(item);
}
}
finally { _currentThreadIsProcessingItems = false; }
}, null);
}
protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
if (!_currentThreadIsProcessingItems) return false;
if (taskWasPreviouslyQueued) TryDequeue(task);
return base.TryExecuteTask(task);
}
protected sealed override bool TryDequeue(Task task)
{
lock (_tasks) return _tasks.Remove(task);
}
public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }
protected sealed override IEnumerable<Task> GetScheduledTasks()
{
bool lockTaken = false;
try
{
Monitor.TryEnter(_tasks, ref lockTaken);
if (lockTaken) return _tasks.ToArray();
else throw new NotSupportedException();
}
finally
{
if (lockTaken) Monitor.Exit(_tasks);
}
}
}
Update: Task Scheduler may not be relevant. I put it here, in case someone finds something new that I never thought about doing or missing.