I agree with Serge Ballsta. As part of the same process, you can send and receive the address of an object through an anonymous channel.
Since the write
system call is guaranteed to be atomic if the message size is below PIPE_BUF
(usually 4096 bytes), therefore streams of several manufacturers will not spoil the object address (8 bytes for 64-bit applications) to each other.
The discussion is cheap, here is the demo code for Linux (for simplicity, the security code and error handlers are missing). Just copy and paste into pipe_ipc_demo.cc
, then compile and run the test.
#include <unistd.h> #include <string.h> #include <pthread.h> #include <string> #include <list> template<class T> class MPSCQ { // pipe based Multi Producer Single Consumer Queue public: MPSCQ(); ~MPSCQ(); int producerPush(const T* t); T* consumerPoll(double timeout = 1.0); private: void _consumeFd(); int _selectFdConsumer(double timeout); T* _popFront(); private: int _fdProducer; int _fdConsumer; char* _consumerBuf; std::string* _partial; std::list<T*>* _list; static const int _PTR_SIZE; static const int _CONSUMER_BUF_SIZE; }; template<class T> const int MPSCQ<T>::_PTR_SIZE = sizeof(void*); template<class T> const int MPSCQ<T>::_CONSUMER_BUF_SIZE = 1024; template<class T> MPSCQ<T>::MPSCQ() : _fdProducer(-1), _fdConsumer(-1) { _consumerBuf = new char[_CONSUMER_BUF_SIZE]; _partial = new std::string; // for holding partial pointer address _list = new std::list<T*>; // unconsumed T* cache int fd_[2]; int r = pipe(fd_); _fdConsumer = fd_[0]; _fdProducer = fd_[1]; } template<class T> MPSCQ<T>::~MPSCQ() { /* omitted */ } template<class T> int MPSCQ<T>::producerPush(const T* t) { return t == NULL ? 0 : write(_fdProducer, &t, _PTR_SIZE); } template<class T> T* MPSCQ<T>::consumerPoll(double timeout) { T* t = _popFront(); if (t != NULL) { return t; } if (_selectFdConsumer(timeout) <= 0) { // timeout or error return NULL; } _consumeFd(); return _popFront(); } template<class T> void MPSCQ<T>::_consumeFd() { memcpy(_consumerBuf, _partial->data(), _partial->length()); ssize_t r = read(_fdConsumer, _consumerBuf, _CONSUMER_BUF_SIZE - _partial->length()); if (r <= 0) { // EOF or error, error handler omitted return; } const char* p = _consumerBuf; int remaining_len_ = _partial->length() + r; T* t; while (remaining_len_ >= _PTR_SIZE) { memcpy(&t, p, _PTR_SIZE); _list->push_back(t); remaining_len_ -= _PTR_SIZE; p += _PTR_SIZE; } *_partial = std::string(p, remaining_len_); } template<class T> int MPSCQ<T>::_selectFdConsumer(double timeout) { int r; int nfds_ = _fdConsumer + 1; fd_set readfds_; struct timeval timeout_; int64_t usec_ = timeout * 1000000.0; while (true) { timeout_.tv_sec = usec_ / 1000000; timeout_.tv_usec = usec_ % 1000000; FD_ZERO(&readfds_); FD_SET(_fdConsumer, &readfds_); r = select(nfds_, &readfds_, NULL, NULL, &timeout_); if (r < 0 && errno == EINTR) { continue; } return r; } } template<class T> T* MPSCQ<T>::_popFront() { if (!_list->empty()) { T* t = _list->front(); _list->pop_front(); return t; } else { return NULL; } } // = = = = = test code below = = = = = #define _LOOP_CNT 5000000 #define _ONE_MILLION 1000000 #define _PRODUCER_THREAD_NUM 2 struct TestMsg { // all public int _threadId; int _msgId; int64_t _val; TestMsg(int thread_id, int msg_id, int64_t val) : _threadId(thread_id), _msgId(msg_id), _val(val) { }; }; static MPSCQ<TestMsg> _QUEUE; static int64_t _SUM = 0; void* functor_producer(void* arg) { int my_thr_id_ = pthread_self(); TestMsg* msg_; for (int i = 0; i <= _LOOP_CNT; ++ i) { if (i == _LOOP_CNT) { msg_ = new TestMsg(my_thr_id_, i, -1); } else { msg_ = new TestMsg(my_thr_id_, i, i + 1); } _QUEUE.producerPush(msg_); } return NULL; } void* functor_consumer(void* arg) { int msg_cnt_ = 0; int stop_cnt_ = 0; TestMsg* msg_; while (true) { if ((msg_ = _QUEUE.consumerPoll()) == NULL) { continue; } int64_t val_ = msg_->_val; delete msg_; if (val_ <= 0) { if ((++ stop_cnt_) >= _PRODUCER_THREAD_NUM) { printf("All done, _SUM=%ld\n", _SUM); break; } } else { _SUM += val_; if ((++ msg_cnt_) % _ONE_MILLION == 0) { printf("msg_cnt_=%d, _SUM=%ld\n", msg_cnt_, _SUM); } } } return NULL; } int main(int argc, char* const* argv) { pthread_t consumer_; pthread_create(&consumer_, NULL, functor_consumer, NULL); pthread_t producers_[_PRODUCER_THREAD_NUM]; for (int i = 0; i < _PRODUCER_THREAD_NUM; ++ i) { pthread_create(&producers_[i], NULL, functor_producer, NULL); } for (int i = 0; i < _PRODUCER_THREAD_NUM; ++ i) { pthread_join(producers_[i], NULL); } pthread_join(consumer_, NULL); return 0; }
And here is the test result ( 2 * sum(1..5000000) == (1 + 5000000) * 5000000 == 25000005000000
):
$ g++ -o pipe_ipc_demo pipe_ipc_demo.cc -lpthread $ ./pipe_ipc_demo
The technique presented here is used in our manufacturing applications. One typical application is that a consumer thread acts as a journalist, and worker threads can write journal messages almost asynchronously. Yes, it almost means that sometimes write streams can be blocked in write()
when the channel is full, and this is a reliable congestion control function provided by the OS.