Does the anonymous channel use a memory barrier for point to point communication?

For example, let's say I highlight struct with a new one and write a pointer to the end of the anonymous channel entry.

If I read the pointer from the corresponding end of the reading, am I guaranteed to see the β€œright” content in the structure?

Also interesting is the question of whether socketpair () results on unix and tcp loopback self-connections on windows have the same guarantees.

Context is a server design that centralizes event dispatch with select / epoll

+5
source share
6 answers

For example, let's say I highlight struct with a new one and write a pointer to the end of the anonymous channel entry.

If I read the pointer from the corresponding end of the reading, am I guaranteed to see the β€œright” content in the structure?

No. There is no guarantee that the write processor will flush the write from its cache and make it visible to another processor that can read.

Also interesting is the question of whether socketpair () results on unix and tcp loopback self-connections on windows have the same guarantees.

No.

+4
source

In practice, the write() call, which is a system call, will block one or more data structures in the kernel, which should take care of the reordering problem. For example, POSIX requires subsequent reads to display data written before they were called, which implies locking (or some type of receive / release) in and of itself.

As for that part of the formal call specification, perhaps this is not so.

+2
source

A pointer is only a memory address, so provided that you are in the same process , the pointer will be valid in the receiving stream and point to the same structure. If you are on different processes, in the best case you will immediately get a memory error, the worse you will read (or write) random memory, which is essentially Undefined Behavior.

Will you read the right content? Neither better nor worse than if your pointer were in a static variable shared by both threads: you still have to do some synchronization if you want consistency.

Will there be a kind of transmitted address between static memory (shared by threads), anonymous pipes, socket pairs, tcp loopback, etc.? No: all these channels transmit bytes, so if you pass the memory address, you will get your memory address. After that, you synchronize, because here you just use the memory address.

If you are not using any other synchronization, something could happen (did I mention Undefined Behavior?):

  • a read stream can access memory before it has been written by writing one giving obsolete data.
  • if you forget to declare structure elements as mutable, the read stream may continue to use cached values, and again get obsolete data
  • a read stream can read partially written data meaning incoherent data
+2
source

An interesting question, for now, is only one correct answer from Cornstalks.

There are no guarantees as part of the same (multi-threaded) process, because the pointer and data follow different paths to reach their destination. Implicit guarantees of receipt / issue do not apply, since these structures cannot copy the pointer through the cache and formally you are dealing with data consumption.

However, looking at how the pointer itself and the structure data fall into the second stream (through the channel and the memory cache, respectively), there is a real possibility that this mechanism will not harm. Sending a pointer to a peer-to-peer stream takes 3 system calls ( write() in the sending stream, select() and read() in the receiving stream), which is (relatively) expensive and by the time the pointer value in the receiving stream is available, the structure data is probably arrived long before that.

Please note that this is just an observation, the mechanism is still incorrect.

+1
source

I suppose your case can be reduced to this model with two threads:

 int data = 0; std::atomic<int*> atomicPtr{nullptr}; //... void thread1() { data = 42; atomicPtr.store(&integer, std::memory_order_release); } void thread2() { int* ptr = nullptr; while(!ptr) ptr = atomicPtr.load(std::memory_order_consume); assert(*ptr == 42); } 

Since you have 2 processes, you cannot use one atomic variable, but since you specified you can omit atomicPtr.load(std::memory_order_consume) from the consuming part, because AFAIK, all Windows architectures work, ensuring that this load will be correct without any barriers on the download side. In fact, I think there are not many architectures where this instruction will not be NO-OP (I only heard about DEC Alpha)

0
source

I agree with Serge Ballsta. As part of the same process, you can send and receive the address of an object through an anonymous channel.

Since the write system call is guaranteed to be atomic if the message size is below PIPE_BUF (usually 4096 bytes), therefore streams of several manufacturers will not spoil the object address (8 bytes for 64-bit applications) to each other.

The discussion is cheap, here is the demo code for Linux (for simplicity, the security code and error handlers are missing). Just copy and paste into pipe_ipc_demo.cc , then compile and run the test.

 #include <unistd.h> #include <string.h> #include <pthread.h> #include <string> #include <list> template<class T> class MPSCQ { // pipe based Multi Producer Single Consumer Queue public: MPSCQ(); ~MPSCQ(); int producerPush(const T* t); T* consumerPoll(double timeout = 1.0); private: void _consumeFd(); int _selectFdConsumer(double timeout); T* _popFront(); private: int _fdProducer; int _fdConsumer; char* _consumerBuf; std::string* _partial; std::list<T*>* _list; static const int _PTR_SIZE; static const int _CONSUMER_BUF_SIZE; }; template<class T> const int MPSCQ<T>::_PTR_SIZE = sizeof(void*); template<class T> const int MPSCQ<T>::_CONSUMER_BUF_SIZE = 1024; template<class T> MPSCQ<T>::MPSCQ() : _fdProducer(-1), _fdConsumer(-1) { _consumerBuf = new char[_CONSUMER_BUF_SIZE]; _partial = new std::string; // for holding partial pointer address _list = new std::list<T*>; // unconsumed T* cache int fd_[2]; int r = pipe(fd_); _fdConsumer = fd_[0]; _fdProducer = fd_[1]; } template<class T> MPSCQ<T>::~MPSCQ() { /* omitted */ } template<class T> int MPSCQ<T>::producerPush(const T* t) { return t == NULL ? 0 : write(_fdProducer, &t, _PTR_SIZE); } template<class T> T* MPSCQ<T>::consumerPoll(double timeout) { T* t = _popFront(); if (t != NULL) { return t; } if (_selectFdConsumer(timeout) <= 0) { // timeout or error return NULL; } _consumeFd(); return _popFront(); } template<class T> void MPSCQ<T>::_consumeFd() { memcpy(_consumerBuf, _partial->data(), _partial->length()); ssize_t r = read(_fdConsumer, _consumerBuf, _CONSUMER_BUF_SIZE - _partial->length()); if (r <= 0) { // EOF or error, error handler omitted return; } const char* p = _consumerBuf; int remaining_len_ = _partial->length() + r; T* t; while (remaining_len_ >= _PTR_SIZE) { memcpy(&t, p, _PTR_SIZE); _list->push_back(t); remaining_len_ -= _PTR_SIZE; p += _PTR_SIZE; } *_partial = std::string(p, remaining_len_); } template<class T> int MPSCQ<T>::_selectFdConsumer(double timeout) { int r; int nfds_ = _fdConsumer + 1; fd_set readfds_; struct timeval timeout_; int64_t usec_ = timeout * 1000000.0; while (true) { timeout_.tv_sec = usec_ / 1000000; timeout_.tv_usec = usec_ % 1000000; FD_ZERO(&readfds_); FD_SET(_fdConsumer, &readfds_); r = select(nfds_, &readfds_, NULL, NULL, &timeout_); if (r < 0 && errno == EINTR) { continue; } return r; } } template<class T> T* MPSCQ<T>::_popFront() { if (!_list->empty()) { T* t = _list->front(); _list->pop_front(); return t; } else { return NULL; } } // = = = = = test code below = = = = = #define _LOOP_CNT 5000000 #define _ONE_MILLION 1000000 #define _PRODUCER_THREAD_NUM 2 struct TestMsg { // all public int _threadId; int _msgId; int64_t _val; TestMsg(int thread_id, int msg_id, int64_t val) : _threadId(thread_id), _msgId(msg_id), _val(val) { }; }; static MPSCQ<TestMsg> _QUEUE; static int64_t _SUM = 0; void* functor_producer(void* arg) { int my_thr_id_ = pthread_self(); TestMsg* msg_; for (int i = 0; i <= _LOOP_CNT; ++ i) { if (i == _LOOP_CNT) { msg_ = new TestMsg(my_thr_id_, i, -1); } else { msg_ = new TestMsg(my_thr_id_, i, i + 1); } _QUEUE.producerPush(msg_); } return NULL; } void* functor_consumer(void* arg) { int msg_cnt_ = 0; int stop_cnt_ = 0; TestMsg* msg_; while (true) { if ((msg_ = _QUEUE.consumerPoll()) == NULL) { continue; } int64_t val_ = msg_->_val; delete msg_; if (val_ <= 0) { if ((++ stop_cnt_) >= _PRODUCER_THREAD_NUM) { printf("All done, _SUM=%ld\n", _SUM); break; } } else { _SUM += val_; if ((++ msg_cnt_) % _ONE_MILLION == 0) { printf("msg_cnt_=%d, _SUM=%ld\n", msg_cnt_, _SUM); } } } return NULL; } int main(int argc, char* const* argv) { pthread_t consumer_; pthread_create(&consumer_, NULL, functor_consumer, NULL); pthread_t producers_[_PRODUCER_THREAD_NUM]; for (int i = 0; i < _PRODUCER_THREAD_NUM; ++ i) { pthread_create(&producers_[i], NULL, functor_producer, NULL); } for (int i = 0; i < _PRODUCER_THREAD_NUM; ++ i) { pthread_join(producers_[i], NULL); } pthread_join(consumer_, NULL); return 0; } 

And here is the test result ( 2 * sum(1..5000000) == (1 + 5000000) * 5000000 == 25000005000000 ):

 $ g++ -o pipe_ipc_demo pipe_ipc_demo.cc -lpthread $ ./pipe_ipc_demo ## output may vary except for the final _SUM msg_cnt_=1000000, _SUM=251244261289 msg_cnt_=2000000, _SUM=1000708879236 msg_cnt_=3000000, _SUM=2250159002500 msg_cnt_=4000000, _SUM=4000785160225 msg_cnt_=5000000, _SUM=6251640644676 msg_cnt_=6000000, _SUM=9003167062500 msg_cnt_=7000000, _SUM=12252615629881 msg_cnt_=8000000, _SUM=16002380952516 msg_cnt_=9000000, _SUM=20252025092401 msg_cnt_=10000000, _SUM=25000005000000 All done, _SUM=25000005000000 

The technique presented here is used in our manufacturing applications. One typical application is that a consumer thread acts as a journalist, and worker threads can write journal messages almost asynchronously. Yes, it almost means that sometimes write streams can be blocked in write() when the channel is full, and this is a reliable congestion control function provided by the OS.

0
source

Source: https://habr.com/ru/post/1244555/


All Articles