Manufacturer / Consumer Problem, Buffer Stream

I am trying to write a buffermanager that controls 3 threads. Typical use will be with a slow producer and fast consumer. The idea of ​​the three buffers is that the producer ALWAYS has a buffer for writing, and the consumer ALWAYS receives the latest data.

Now I already have this, and this sorting works.

namespace YariIfStream
{

    /// <summary>
    /// A class that manages three buffers used for IF data streams
    /// </summary>
    public class YariIFStream
    {
        private Stream writebuf; ///<value>The stream used for writing</value>
        private Stream readbuf; ///<value>The stream used for reading</value>
        private Stream swapbuf; ///<value>The stream used for swapping</value>
        private bool firsttime; ///<value>Boolean used for checking if it is the first time a writebuffers is asked</value>
        private Object sync; ///<value>Object used for syncing</value>

        /// <summary>
        /// Initializes a new instance of the Yari.YariIFStream class with expandable buffers
        /// </summary>
        public YariIFStream()
        {
            sync = new Object();
            eerste = true;

            writebuf = new MemoryStream();
            readbuf = new MemoryStream();
            swapbuf = new MemoryStream();
        }

        /// <summary>
        /// Returns the stream with the buffer with new data ready to be read
        /// </summary>
        /// <returns>Stream</returns>
        public Stream GetReadBuffer()
        {
            lock (sync)
            {
                Monitor.Wait(sync);
                Stream tempbuf = swapbuf;
                swapbuf = readbuf;
                readbuf = tempbuf;
            }
            return readbuf;
        }

        /// <summary>
        /// Returns the stream with the buffer ready to be written with data
        /// </summary>
        /// <returns>Stream</returns>
        public Stream GetWriteBuffer()
        {
            lock (sync)
            {
                Stream tempbuf = swapbuf;
                swapbuf = writebuf;
                writebuf = tempbuf;
                if (!firsttime)
                {
                    Monitor.Pulse(sync);
                }
                else
                {
                    firsttime = false;

                }
            }
            //Thread.Sleep(1);
            return writebuf;
        }

    }
}

The first check is used because the first time a write request is given, it cannot impulse the user, because the buffer still needs to be written with data. When the write buffer is written a second time, we can be sure that the previous buffer contains data.

I have two streams, one producer and one consumer. This is my conclusion:

prod: uv_hjd`alv   cons: N/<]g[)8fV
prod: N/<]g[)8fV   cons: 5Ud*tJ-Qkv
prod: 5Ud*tJ-Qkv   cons: 4Lx&Z7qqjA
prod: 4Lx&Z7qqjA   cons: kjUuVyCa.B
prod: kjUuVyCa.B

, , . , , .

:

  • , . , -...
  • Thread.Sleep(1); GetWriteBuffer() . - .

.

+3
1

. Stream []. . , Stream , .

, .

/// <summary>
/// This namespace provides a crossthread-, concurrentproof buffer manager. 
/// </summary>
namespace YariIfStream
{

    /// <summary>
    /// A class that manages three buffers used for IF data streams
    /// </summary>
    public class YariIFStream
    {
        private byte[] writebuf; ///<value>The buffer used for writing</value>
        private byte[] readbuf; ///<value>The buffer used for reading</value>
        private byte[] swapbuf; ///<value>The buffer used for swapping</value>
        private bool firsttime; ///<value>Boolean used for checking if it is the first time a writebuffers is asked</value>
        private Object sync; ///<value>Object used for syncing</value>

        /// <summary>
        /// Initializes a new instance of the Yari.YariIFStream class with expandable buffers with a initial capacity as specified
        /// </summary>
        /// <param name="capacity">Initial capacity of the buffers</param>
        public YariIFStream(int capacity)
        {
            sync = new Object();
            firsttime = true;

            writebuf = new byte[capacity];
            readbuf = new byte[capacity];
            swapbuf = new byte[capacity];
        }

        /// <summary>
        /// Returns the buffer with new data ready to be read
        /// </summary>
        /// <returns>byte[]</returns>
        public byte[] GetReadBuffer()
        {
            byte[] tempbuf;
            lock (sync)
            {
                Monitor.Wait(sync);
                tempbuf = swapbuf;
                swapbuf = readbuf;
            }
            readbuf = tempbuf;

            return readbuf;
        }

        /// <summary>
        /// Returns the buffer ready to be written with data
        /// </summary>
        /// <returns>byte[]</returns>
        public byte[] GetWriteBuffer()
        {
            byte[] tempbuf;
            lock (sync)
            {
                tempbuf = swapbuf;
                swapbuf = writebuf;

                writebuf = tempbuf;

                if (!firsttime)
                {
                    Monitor.Pulse(sync);
                }
                else
                {
                    firsttime = false;
                }
            }
            return writebuf;
        }
    }
}
0

Source: https://habr.com/ru/post/1756558/


All Articles