How to implement your own extended producer / consumer scenario?


Note:
I made a complete redistribution of my question. You can see the original question through the change history.


I need a "powerful" queue that provides the following functions:

  • I have a specific area for a group of objects. this means that Group A , Group B , ... will have its own turn
  • I fill a queue in a thread limited to Thread A (producer)
  • I am reading a queue in a thread with a restricted group Topic B (Consumer)

so I will have the following scripts:

  • there is no and will not be an element in the queue (since tasks were called with an empty "target group"): Thread B should avoid a loop
  • there is no element in the queue, since Thread A is working on an element to queue: Thread B must wait
  • there are elements in the queue: Thread B should be able to deactivate and process the element
  • there is no element in the queue since Thread A has no more elements for the queue: Thread B should avoid a loop

I have now come up with the following implementation:

public class MightyQueue<T> where T : class { private readonly Queue<T> _queue = new Queue<T>(); private bool? _runable; private volatile bool _completed; public bool Runable { get { while (!this._runable.HasValue) { Thread.Sleep(100); } return this._runable ?? false; } set { this._runable = value; } } public void Enqueue(T item) { if (item == null) { throw new ArgumentNullException("item"); } this._queue.Enqueue(item); } public void CompleteAdding() { this._completed = true; } public bool TryDequeue(out T item) { if (!this.Runable) { item = null; return false; } while (this._queue.Count == 0) { if (this._completed) { item = null; return false; } Thread.Sleep(100); } item = this._queue.Dequeue(); return true; } } 

which will then be used

Producer

 if (anythingToWorkOn) { myFooMightyQueueInstance.Runable = false; } else { myFooMightyQueueInstance.Runable = true; while (condition) { myFooMightyQueueInstance.Enqueue(item); } myFooMightyQueueInstance.CompleteAdding(); } 

Consumer

 if (!myFooMightyQueueInstance.Runable) { return; } T item; while (myFooMightyQueueInstance.TryDequeue(out item)) { //work with item } 

but I believe that this approach is wrong, since I use some Thread.Sleep() material there (can't it be some waitHandle or something else?) ... I'm not talking about algo itself or ... someone can anyone help me?

+1
source share
5 answers

You must start with the general Producer-Consumer lineup and use it. Implementing this inside a queue is not such a good idea, as it prevents you from using semaphores for signaling (or you can have open semaphores in your queue, but this is a bad idea indeed ).

Once thread A has inserted one work item, it should signal a semaphore to notify stream B. When thread B has finished processing all the elements, it should signal a semaphore to notify everyone else that it has finished. Your main thread should wait until this second semaphore finds out that everything is done.

[change]

First you have a manufacturer and a consumer:

 public interface IProducer<T> : IStoppable { /// <summary> /// Notifies clients when a new item is produced. /// </summary> event EventHandler<ProducedItemEventArgs<T>> ItemProduced; } public interface IConsumer<T> : IStoppable { /// <summary> /// Performs processing of the specified item. /// </summary> /// <param name="item">The item.</param> void ConsumeItem(T item); } public interface IStoppable { void Stop(); } 

So, in your case, the class that creates the mail will have to fire the ItemProduced event, and to send the class it will need to implement ConsumeItem .

And then you pass these two instances to the Worker instance:

 public class Worker<T> { private readonly Object _lock = new Object(); private readonly Queue<T> _queuedItems = new Queue<T>(); private readonly AutoResetEvent _itemReadyEvt = new AutoResetEvent(false); private readonly IProducer<T> _producer; private readonly IConsumer<T> _consumer; private volatile bool _ending = false; private Thread _workerThread; public Worker(IProducer<T> producer, IConsumer<T> consumer) { _producer = producer; _consumer = consumer; } public void Start(ThreadPriority priority) { _producer.ItemProduced += Producer_ItemProduced; _ending = false; // start a new thread _workerThread = new Thread(new ThreadStart(WorkerLoop)); _workerThread.IsBackground = true; _workerThread.Priority = priority; _workerThread.Start(); } public void Stop() { _producer.ItemProduced -= Producer_ItemProduced; _ending = true; // signal the consumer, in case it is idle _itemReadyEvt.Set(); _workerThread.Join(); } private void Producer_ItemProduced (object sender, ProducedItemEventArgs<T> e) { lock (_lock) { _queuedItems.Enqueue(e.Item); } // notify consumer thread _itemReadyEvt.Set(); } private void WorkerLoop() { while (!_ending) { _itemReadyEvt.WaitOne(-1, false); T singleItem = default(T); lock (_lock) { if (_queuedItems.Count > 0) { singleItem = _queuedItems.Dequeue(); } } while (singleItem != null) { try { _consumer.ConsumeItem(singleItem); } catch (Exception ex) { // handle exception, fire an event // or something. Otherwise this // worker thread will die and you // will have no idea what happened } lock (_lock) { if (_queuedItems.Count > 0) { singleItem = _queuedItems.Dequeue(); } } } } } // WorkerLoop } // Worker 

This general idea may require additional customization.

To use it, you need your classes to implement these two interfaces:

 IProducer<IMail> mailCreator = new MailCreator(); IConsumer<IMail> mailSender = new MailSender(); Worker<IMail> worker = new Worker<IMail>(mailCreator, mailSender); worker.Start(); // produce an item - worker will add it to the // queue and signal the background thread mailCreator.CreateSomeMail(); // following line will block this (calling) thread // until all items are consumed worker.Stop(); 

The great thing about this:

  • you can have as many workers as possible
  • multiple employees may accept items from the same manufacturer.
  • multiple workers can send items to the same consumer (although this means that you need to accept the case where the consumer is implemented in a thread-safe manner)
+1
source

If you have .Net 4.0, use BlockingCollection . It covers the whole mess for you, including the endpoint, using the CompleteAdding method.

If you have an earlier .Net, then upgrade (that is, I'm too lazy to explain how to implement something that has already been done for you.)

EDIT: I don't think your problem guarantees slicing. Just create all your emails in advance, and then sleep until the appointed time.

+2
source

What you want can be done using conditionvariables. I will be compiling an example of pseudo-code, should not be too complicated to implement.

There is something line by line in one thread:

 while(run) compose message conditionvariable.lock() add message to queue conditionvariable.notifyOne() conditionvariable.release() 

While the other thread has something along these lines

 while(threadsafe_do_run()) while threadsafe_queue_empty() conditionvariable.wait() msg = queue.pop() if msg == "die" set_run(false) conditionvariable.release() send msg 

So, if you do not receive any messages, click die-message. The same when all messages have been processed.

do_run () and queue_empty () should check their things in streaming mode, use appropriate locks.

wait () is returned when notifyOne () is called, and then the queue sends a message. in most frameworks, conditionvariable already has a lock, you may need to add a lock statement to .NET.

+1
source

I wrote a simple example that works great for me and should be appropriate for your scripts. If the consumer works, it depends on how the current variable is set, but you can easily modify it to a more complex condition, for example, "if the mail does not exist, but someone said that I should wait more."

 public class MailSystem { private readonly Queue<Mail> mailQueue = new Queue<Mail>(); private bool running; private Thread consumerThread; public static void Main(string[] args) { MailSystem mailSystem = new MailSystem(); mailSystem.StartSystem(); } public void StartSystem() { // init consumer running = true; consumerThread = new Thread(ProcessMails); consumerThread.Start(); // add some mails mailQueue.Enqueue(new Mail("Mail 1")); mailQueue.Enqueue(new Mail("Mail 2")); mailQueue.Enqueue(new Mail("Mail 3")); mailQueue.Enqueue(new Mail("Mail 4")); Console.WriteLine("producer finished, hit enter to stop consumer"); // wait for user interaction Console.ReadLine(); // exit the consumer running = false; Console.WriteLine("exited"); } private void ProcessMails() { while (running) { if (mailQueue.Count > 0) { Mail mail = mailQueue.Dequeue(); Console.WriteLine(mail.Text); Thread.Sleep(2000); } } } } internal class Mail { public string Text { get; set; } public Mail(string text) { Text = text; } } 
+1
source

I would use one thread to work the whole process. This generates a mailbox and sends. Just because generating a mail body will not take long, but send an email.

Also, if you use the SMTP server that comes with Windows, you can simply drop the email in the queue folder, and the SMTP server will take care of sending emails.

So, you can start several threads (keeping the cap on the number), where each thread does this work. If you are working with a series of sets of tasks (data), then you can parallelize the data (this is splitting data into pieces that correspond to the number of cells in the system, for example, and shoot from tasks (streams).

Using Tasks will make all of this a lot easier for you, namely 2 threads to send one message or one thread to complete an entire task, but using multiple threads to do multiple tasks in parallel.

0
source

Source: https://habr.com/ru/post/907299/


All Articles