Is Istream and ostream with shared streambuf mutually thread safe for duplex I / O?

I created a custom streambuf for buffered input / output of a socket on the network, overriding overflow, overflow, and synchronization, so that underflow is mutually transparent with a set of two others (I have separate internal and output internal buffers). This works fine, but I want to use this for full duplex I / O, where one stream can input while the other outputs, so I would like to use istream for the receiving stream and ostream for sending when sharing the streambuf network, which abstracts all the socket stuff . My question is to what extent are streambuf members affected by input operations on istream not overlapping with streambuf members affected by output operations on ostream if the input and output buffers are separate?

It would be better to be able to do this instead of separating the socket material from my abstraction of streambuf so that the socket can be shared between istream and ostream with separate streambufs - then I would also need two versions of streambuf - one with one internal buffer (for use only in istream only or only ostream) and one with two internal buffers, as I am now, for use in iostream ... sucks as extra classes and code duplication.

+7
source share
3 answers

There is no special guarantee for std::streambuf (or std::basic_streambuf<...> ) that gives more guarantees than usual. That is, you can have several threads reading the state of the object at any time, but if there is one thread that changes the state of the object, there will be no other thread accessing the object. Both read and write symbols change the state of the stream buffer, that is, from a formal point of view, you cannot use them without external synchronization.

Inside, the two buffers are completely separate and have nothing to do with each other. Operations with stream buffers change them in a rather structured way, and I cannot imagine that any implementation would have an explicit interaction between two sets of pointers. That is, in practice, I do not think that synchronization is required between reading and writing. However, I did not understand earlier that two sets of buffer pointers can actually use the same cache lines, which can at least cause performance problems. I do not think that this should cause problems with correctness.

The only resource that can be shared between two stream buffers is the std::locale object, which must be inactive. In addition, std::streambuf does not use this object on its own: it is your stream buffer that can use some faces (for example, the std::codecvt<...> facet). Since the locale is changed by calling the virtual function imbue() , you can catch this change and perform any synchronization if your stream buffer uses the locale.

In general, the standard does not guarantee that it will work to use parallel streams for reading and writing using the same stream buffer. In practice, DS9k is probably the only system in which it fails, and the two threads can synchronize efficiently because buffer pointers end in shared cache lines.

+2
source

Input and output sequences are essentially independent. There's a good chart at cppreference.com :

Diagram of streambuf members

The only thing that separates the input and output sequences is the locale object, which contains the codecvt facet used to perform text encoding translation.

Theoretically, changing midstream text encoding would be unsafe, but in practice, libraries do not support this operation at all!

You must be kind.

+2
source

For full duplex mode, you will need two buffers. If you use streambuf interfaces for both, so that you can connect to the normal ostream and istream interfaces, then the full picture looks something like this:

A cross linked two streambuf interface

The two buffers are obviously completely independent and symmetrical, so we can ignore one side and just focus on one buffer.

Moreover, it can be assumed that there are only two streams: a read stream and a write stream. If more threads are involved, then two threads will read at the same time or write at the same time; which would lead to undesirable race conditions and therefore makes no sense. We can assume that the user will have some kind of mechanism that ensures that only one stream at a time writes data to the stream buffer, and also only one stream at a time reads from it.

In the most general case, the actual buffer exists from several adjacent memory blocks. Each put- and receive area is completely inside one such block. While they are in different memory blocks, they are again unconnected.

Each get / put region has three pointers: one pointer that points to the beginning of the region (eback / pbase), one pointer that points to one byte after the end of the region (egptr / epptr), and a pointer that points to the current position in areas (gptr / pptr). Each of these pointers can be accessed directly from a class derived from std::streambuf through secure access std::streambuf with the same name ( eback() , pbase() , egptr() , epptr() , gptr() and pptr() ). Note that here we have in mind eback(), egptr() and gptr() one streambuf and pbase(), epptr() and pptr() another streambuf (see Image above).

std::streambuf has public functions that access or modify these six pointers. They are:

 table, th, td { border: 1px solid black; border-collapse: collapse; } th, td { padding: 5px; } 
 <table style="width:100%"> <caption>Public member functions of <code>std::streambuf</code></caption> <tr> <th>Method</th><th>Changes and/or accesses</th> </tr> <tr> <td><code>pubsetbuf()</code></td><td>Calls <code>setbuf()</code> of the most derived class</td> <tr></tr> <td><code>pubseekoff()</code></td><td>Calls <code>seekoff()</code> of the most derived class</td> <tr></tr> <td><code>pubseekpos()</code></td><td>Calls <code>seekpos()</code> of the most derived class</td> <tr></tr> <td><code>pubsync()</code></td><td>Calls <code>sync()</code> of the most derived class</td> </tr><tr> <td><code>in_avail()</code></td><td>Get area</td> </tr><tr> <td><code>snextc()</code></td><td>Calls <code>sbumpc()</code>, <code>uflow()</code> and/or <code>sgetc()</code></td> </tr><tr> <td><code>sbumpc()</code></td><td><code>gptr</code>, possibly calls <code>uflow()</code></td> </tr><tr> <td><code>sgetc()</code></td><td><code>gptr</code>, possibly calls <code>underflow()</code></td> </tr><tr> <td><code>sgetn()</code></td><td>Calls <code>xgetn()</code> of the most derived class.</td> </tr><tr> <td><code>sputc()</code></td><td><code>pptr</code>, possibly calls <code>overflow()</code></td> </tr><tr> <td><code>sputn()</code></td><td>Calls <code>xsputn()</code> of the most derived class</td> </tr><tr> <td><code>sputbackc()</code></td><td><code>gptr</code>, possibly calls <code>pbackfail()</code></td> </tr><tr> <td><code>sungetc()</code></td><td><code>gptr</code>, possibly calls <code>pbackfail()</code></td> </tr> </table> 

Protected Member Functions

 table, th, td { border: 1px solid black; border-collapse: collapse; } th, td { padding: 5px; } 
 <table style="width:100%"> <caption>Protected member functions of <code>std::streambuf</code></caption> <tr> <th>Method</th><th>Changes and/or accesses</th> </tr> <tr> <td><code>setbuf()</code></td><td>User defined (could be used for single array buffers)</td> <tr></tr> <td><code>seekoff()</code></td><td>User defined (repositions get area)</td> <tr></tr> <td><code>seekpos()</code></td><td>User defined (repositions get area)</td> <tr></tr> <td><code>sync()</code></td><td>User defined (could do anything, depending on which buffer this is, could change either get area or put area)</td> </tr><tr> <td><code>showmanyc()</code></td><td>User defined (get area; if put area uses the same allocated memory block, can also accesses pptr)</td> </tr><tr> <td><code>underflow()</code></td><td>User defined (get area; but also strongly coupled to put ares)</td> </tr><tr> <td><code>uflow()</code></td><td>Calls underflow() and advances gptr</td> </tr><tr> <td><code>xsgetn()</code></td><td>get area (as if calling <code>sbumpc()</code> repeatedly), might call <code>uflow()</code></td> </tr><tr> <td><code>gbump()</code></td><td>gptr</td> </tr><tr> <td><code>setg()</code></td><td>get area</td> </tr><tr> <td><code>xsputn()</code></td><td>put area (as if calling <code>sputc()</code> repeatedly), might call <code>overflow()</code> or do something similar)</td> </tr><tr> <td><code>overflow()</code></td><td>put area</td> </tr><tr> <td><code>pbump()</code></td><td>pptr</td> </tr><tr> <td><code>setp()</code></td><td>put area</td> </tr><tr> <td><code>pbackfail()</code></td><td>User defined (might be pure horror; aka, get and put area)</td> </tr> </table> 

We must separate the read and write actions into actions into a (continuous) block of memory. Of course, it is possible that one call to -say- sputn() writes to several blocks, but we can lock and unlock each block action.

There are several important buffer states shown in the figure below. The green arrows represent the transitions between the states performed by the stream (s) that read data from the receiving area, while the blue arrows represent the transitions between the states performed by the stream (s) that write data to the location area. In other words, two green actions cannot occur simultaneously; can't two blue actions. But the effect of green and blue can occur simultaneously.

Read and write streambuf transitions

I still need to write an implementation for this, but my approach would be to use one mutex per buffer and lock it only at the beginning of each action in order to get the necessary information to perform the read and / or write action. Then, at the end of this action, lock the mutex again to see if something has been changed by another thread and / or complete the read / write by an administrative action.

Each time the write stream raises pptr, egptr is updated atomically, unless at the beginning of the write action eback! = Pbase; in this case egptr does not need to be updated, of course. To do this, you must block the mutex before hitting and unlock after updating egptr. Therefore, the same mutex is blocked when moving get- or placing areas. We could not block the mutex when raising gptr itself, but if we do this, then at the beginning of the corresponding read action there would be data in the buffer, and the simultaneous write action would not change this, so there is no danger that write thread (s) will try to move get area at the same time.

I will edit this answer when I find out more details.

0
source

Source: https://habr.com/ru/post/912164/


All Articles