I am sending simple UDP packets to this Haskell server. For the package source, I use a simple text file generated by "aspell -l en dump master". However, any list of over 120,000 messages should work. If I run the user and the manufacturer at the same time, I do not lose packages. However, I want to simulate a very busy consumer. If I enter threadDelay 20 seconds before the consumer starts, I get packet loss. This counter is intuitive for me, because I do less with standard and disk IO when I delay consumption. Can someone explain why I get losses with a delay? How can I manage sockets and TChan to work better so as not to lose (only higher memory usage) while my consumer is very busy?
import Control.Monad (forever) import Control.Concurrent (forkIO, threadDelay) import Control.Concurrent.STM (writeTChan, readTChan, atomically) import Control.Concurrent.STM.TChan import Network.Socket hiding (send, sendTo, recv, recvFrom) import Network.Socket.ByteString import Data.ByteString hiding(putStrLn, head) import qualified Data.ByteString.Char8 as Char8 (putStrLn, putStr) import System.IO main :: IO () main = withSocketsDo $ do hSetBuffering stdout NoBuffering addrinfos <- getAddrInfo (Just (defaultHints {addrFlags = [AI_PASSIVE]})) Nothing (Just "2000") let serveraddr = head addrinfos sock <- socket (addrFamily serveraddr) Datagram defaultProtocol bindSocket sock (addrAddress serveraddr) chan <- newTChanIO forkIO(producer chan sock) -- Uncomment the threadDelay below to see lossy version -- threadDelay (1000000 * 20) forkIO(consumer chan) forever $ threadDelay (1000000 * 60) producer :: TChan ByteString -> Socket -> IO () producer chan sock = forever $ do (msg) <- recv sock 256 atomically $ writeTChan chan msg consumer :: TChan ByteString -> IO () consumer chan = forever $ do msg <- atomically $ readTChan chan Char8.putStr msg
source share