Dedicated stream (one stream per connection) with the ability to buffer (c / C ++)

My process is read from individual queue tasks that need to be sent to multiple recipients. We must maintain order between tasks (i.e. a task arriving in the queue at 00:00 must be sent before the task that arrived at 00:01), so we cannot use the thread pool. An order must be supported for each destination.

One solution is to create a dedicated stream for each destination. The main thread reads the task from the queue and, depending on the destination, finds the correct thread.

This solution has a problem: if the worker thread is busy, the main thread will remain blocked, which will make the system slow. I need a new queue for the thread. The main thread divides resources into queues, and the worker thread reads new queues for incoming messages ...

I would like to share my thought with the SO community, and I am looking for a solution for C / C ++ that is close to me. Is there a library that implements such a model?

+3
source share
2 answers

"The need to maintain order" all-but-bluntly states that you will perform tasks one at a time no matter how many threads you have. In this case, you are probably best off with one thread serving requests.

-, , , , , . , , , .

Edit:

/ . , , - ( ):

std::vector<pthread_t> threads;

int num_threads = atoi(argv[1]);
threads.resize(num_threads);

for (int i=0; i<num_threads; i++)
    pthread_create(&threads[i], NULL, thread_routine, NULL);
+2

, , ; , , , , , . , , ( ).

, "" FIFO (, std:: queue), Mutex , "master" , (, -, , , ).

, , - ():

struct WorkerThreadData & workerThread = _workerThreads[threadIndexIWantToSendTo];
workerThread.m_mutex.Lock();
workerThread.m_incomingTasksList.push_back(theNewTaskObject);
workerThread.m_mutex.Unlock();
workerThread.m_signalMechanism.SignalThreadToWakeUp();  // make sure the worker looks at the task list!

... :

struct WorkerThreadData & myData = _workerThreads[myWorkerIndex];
TaskObject * taskObject;
while(1)
{
   myData.m_signalMechanism.WaitForSignal();  // block until the main thread wakes me up

   myData.m_mutex.Lock();
   taskObject = (myData.m_incomingTasks.length() > 0) ? myData.m_incomingTasks.pop_front() : NULL;
   myData.m_mutex.Unlock();

   if (taskObject) 
   {
      taskObject->DoTheWork();
      delete taskObject;
   }
}

( - ), Mutex . , .

+4

Source: https://habr.com/ru/post/1791692/


All Articles