As the OP points out, so far I can also write a real answer. Start by consuming memory.
Two useful links are the Haskell data type memory size and http://blog.johantibell.com/2011/06/memory-footprints-of-some-common-data.html . We will also need to look at the definitions of some of our structures.
-- from http://hackage.haskell.org/package/stm-chans-3.0.0.2/docs/src/Control-Concurrent-STM-TMQueue.html data TMQueue a = TMQueue {-
The TQueue implementation uses a standard functional queue with read end and write end.
Let the upper limit of memory use be given and suppose that we read the entire file in TMQueue before the consumer does anything. In this case, the end of our TQueue record will contain a list with one element per input line (saved as a byte string). Each node list will look like
(:) bytestring tail
which takes 3 words (1 per field + 1 for the constructor). Each byte line is 9 words, so add them together, and there are 12 service words per line , not counting the actual data. Your test data is 5 million lines, so 60 million words of overhead for the whole file (plus some constants), which are about 460 MB on a 64-bit system (assuming that I performed my math correctly, is always doubtful). Add 40MB for the actual data, and we will get values โโclose to what I see in my system.
So why is our memory usage close to this upper bound? I have a theory (research remains as an exercise!). Firstly, the manufacturer most likely works a little faster than the consumer, because reading is usually faster than writing (I use spinning disks, maybe the SSD will be different). Here is the definition of readTQueue:
-- |Read the next value from the 'TQueue'. readTQueue :: TQueue a -> STM a readTQueue (TQueue read write) = do xs <- readTVar read case xs of (x:xs') -> do writeTVar read xs' return x [] -> do ys <- readTVar write case ys of [] -> retry _ -> case reverse ys of [] -> error "readTQueue" (z:zs) -> do writeTVar write [] writeTVar read zs return z
First we try to read from the end of the reading, and if this one is empty, we will try to read from the end of the record, after it changes this list.
I think this is happening: when a consumer needs to read from the end of the record, he needs to go through the list of input data in the STM transaction. This takes some time, which will lead to the fact that he will fight with the producer. As the producer advances, this list grows longer, causing the reading to take even longer, during which the producer can write more values, which will cause the reading to fail. This process is repeated until the producer finishes, and only then will the consumer have the opportunity to process the bulk of the data. Not only is this a concurrency disruption, it adds more processor overhead because a consumer transaction is constantly delayed and fails.
So what about unagi? There are several key differences. First, unagi-chan uses internal arrays instead of lists. This slightly reduces overhead. Most of the overhead is with ByteString pointers, so a little, but not much. Secondly, unagi stores pieces of arrays. Even if we pessimistically assume that the producer always wins the argument, after the array is full, it pushes the side of the channel producer. Now the producer writes a new array, and the consumer reads from the old array. This situation is almost perfect; there is no competition with shared resources, the consumer has a good locality of links, and since the consumer works with another memory block, there are no problems with cache consistency. Unlike my theoretical description of TMQueue , now you get parallel operations, allowing the manufacturer to clear part of the memory usage so that it never gets to the upper bound.
As an aside, I think that consumer dosing is not profitable. The pens are already buffered by the I / O subsystem, so I donโt think it brings anything. For me, performance improved slightly when I switched consumers to take turns in any case.
Now, what can you do about this problem? Based on my working hypothesis that TMQueue suffers from conflicting issues and your specific requirements, you just need to use a different type of queue. Obviously, unagi works very well. I also tried TMChan , it was about 25% slower than unagi, but it used 45% less memory, so this is also a good option. (this is not surprising, TMChan has a different structure from TMQueue , so it will have different performance characteristics)
You can also try changing your algorithm so that the manufacturer sends multi-line fragments. This will reduce the amount of memory in all byte strings.
So when can you use TMQueue ? If the producer and consumer have the same speed, or the consumer is faster, he should be fine. In addition, if the processing time is uneven or the manufacturer works in packages, you are likely to get good amortized performance. This is pretty much the worst situation, and maybe it should be reported as an error regarding stm ? I think if the reading function has been changed to
-- |Read the next value from the 'TQueue'. readTQueue :: TQueue a -> STM a readTQueue (TQueue read write) = do xs <- readTVar read case xs of (x:xs') -> do writeTVar read xs' return x [] -> do ys <- readTVar write case ys of [] -> retry _ -> do writeTVar write [] let (z:zs) = reverse ys writeTVar read zs return z
this will avoid this problem. Now the z and zs bindings should be evaluated lazily, so a list traversal will occur outside of this transaction, which allows read operations to sometimes succeed. Assuming that I am correct about the problem in the first place, of course (and that this definition is quite lazy). However, there may be other unexpected drawbacks.