Thread Safety in C # Parallel Queue

I have a thread MessagesManagerto which different threads can send messages, and then this thread MessagesManageris responsible for publishing these messages inside SendMessageToTcpIP()(the starting point of the thread MessagesManager).

class MessagesManager : IMessageNotifier
{
    //private
    private readonly AutoResetEvent _waitTillMessageQueueEmptyARE = new AutoResetEvent(false);
    private ConcurrentQueue<string> MessagesQueue = new ConcurrentQueue<string>(); 

    public void PublishMessage(string Message)
    {
        MessagesQueue.Enqueue(Message);
        _waitTillMessageQueueEmptyARE.Set();
    }

    public void SendMessageToTcpIP()
    {
        //keep waiting till a new message comes
        while (MessagesQueue.Count() == 0)
        {
            _waitTillMessageQueueEmptyARE.WaitOne();
        }

        //Copy the Concurrent Queue into a local queue - keep dequeuing the item once it is inserts into the local Queue
        Queue<string> localMessagesQueue = new Queue<string>();

        while (!MessagesQueue.IsEmpty)
        {
            string message;
            bool isRemoved = MessagesQueue.TryDequeue(out message);
            if (isRemoved)
                localMessagesQueue.Enqueue(message);
        }

        //Use the Local Queue for further processing
        while (localMessagesQueue.Count() != 0)
        {
            TcpIpMessageSenderClient.ConnectAndSendMessage(localMessagesQueue.Dequeue().PadRight(80, ' '));
            Thread.Sleep(2000);
        }
    }
}

Different threads (3-4) send their message by calling PublishMessage(string Message)(using the same MessageManager). When a message appears, I push this message in a parallel queue and notifies SendMessageToTcpIP()when installing _waitTillMessageQueueEmptyARE.Set();. Inside SendMessageToTcpIP(), I copy a message from a parallel queue inside a local queue, and then publish one by one.

: enqueuing dequeuing ? - - ?

+6
4

, , , .NET , " ", BlockingCollection. :

class MessagesManager : IDisposable {
    // note that your ConcurrentQueue is still in play, passed to constructor
    private readonly BlockingCollection<string> MessagesQueue = new BlockingCollection<string>(new ConcurrentQueue<string>());

    public MessagesManager() {
        // start consumer thread here
        new Thread(SendLoop) {
            IsBackground = true
        }.Start();
    }

    public void PublishMessage(string Message) {
        // no need to notify here, will be done for you
        MessagesQueue.Add(Message);
    }

    private void SendLoop() {
        // this blocks until new items are available
        foreach (var item in MessagesQueue.GetConsumingEnumerable()) {
            // ensure that you handle exceptions here, or whole thing will break on exception
            TcpIpMessageSenderClient.ConnectAndSendMessage(item.PadRight(80, ' '));
            Thread.Sleep(2000); // only if you are sure this is required 
        }
    }

    public void Dispose() {            
        // this will "complete" GetConsumingEnumerable, so your thread will complete
        MessagesQueue.CompleteAdding();
        MessagesQueue.Dispose();
    }
}
+3

.NET ActionBlock <T> , . .

:

//In an initialization function
ActionBlock<string> _hmiAgent=new ActionBlock<string>(async msg=>{
        TcpIpMessageSenderClient.ConnectAndSendMessage(msg.PadRight(80, ' '));
        await Task.Delay(2000);
);

//In some other thread ...
foreach ( ....)
{
    _hmiAgent.Post(someMessage);
}

// When the application closes

_hmiAgent.Complete();
await _hmiAgent.Completion;

ActionBlock - , , , . . , , , .

, , TransformBlock < , Tout > . , , , :

//In an initialization function
TransformBlock<string> _hmiAgent=new TransformBlock<string,string>(
    msg=>msg.PadRight(80, ' '));

ActionBlock<string> _tcpBlock=new ActionBlock<string>(async msg=>{
        TcpIpMessageSenderClient.ConnectAndSendMessage());
        await Task.Delay(2000);
);

var linkOptions=new DataflowLinkOptions{PropagateCompletion = true};
_hmiAgent.LinkTo(_tcpBlock);

    _hmiAgent.Post(someMessage);

, _tcpBlock:

    _hmiAgent.Complete();
    await _tcpBlock.Completion;

Visual Studio 2015+ TPL

Bar Arnon TPL Dataflow - , , , .

+2

, :

class MessagesManager {
    private readonly AutoResetEvent messagesAvailableSignal = new AutoResetEvent(false);
    private readonly ConcurrentQueue<string> messageQueue = new ConcurrentQueue<string>();

    public void PublishMessage(string Message) {
        messageQueue.Enqueue(Message);
        messagesAvailableSignal.Set();
    }

    public void SendMessageToTcpIP() {
        while (true) {
            messagesAvailableSignal.WaitOne();
            while (!messageQueue.IsEmpty) {
                string message;
                if (messageQueue.TryDequeue(out message)) {
                    TcpIpMessageSenderClient.ConnectAndSendMessage(message.PadRight(80, ' '));
                }
            }
        }
    }
}

, :

  • : ,
  • 2000ms
0

, ConcurrentQueue AutoResetEvent . , .

, , SendMessageToTcpIP - .
- :

while (!MessagesQueue.IsEmpty)
        {
            string message;
            bool isRemoved = MessagesQueue.TryDequeue(out message);
            if (isRemoved)
                localMessagesQueue.Enqueue(message);
        }

        //<<--- what happens if another thread enqueues a message here?

        while (localMessagesQueue.Count() != 0)
        {
            TcpIpMessageSenderClient.ConnectAndSendMessage(localMessagesQueue.Dequeue().PadRight(80, ' '));
            Thread.Sleep(2000);
        }

In addition, it AutoResetEventis an extremely heavy object. it uses a kernel object to synchronize threads. each call is a system call, which can be costly. consider using a user-mode synchronization object (doesn't .net provide any kind of condition variable?)

0
source

Source: https://habr.com/ru/post/1016169/


All Articles