I work with Haskell streams, and I am faced with the problem of passing lazily evaluated values ββacross a channel. For example, with N workflows and 1 output stream, workers report invaluable work, and the output stream completes the work for them.
I read about this problem in various documents and saw various solutions, but I found only one solution that works and the rest do not. Below is the code in which workflows start some calculations that can take a lot of time. I start threads in descending order, so that the first thread should take the longest, and later threads should end earlier.
import Control.Concurrent (forkIO) import Control.Concurrent.Chan -- .Strict import Control.Concurrent.MVar import Control.Exception (finally, evaluate) import Control.Monad (forM_) import Control.Parallel.Strategies (using, rdeepseq) main = (>>=) newChan $ (>>=) (newMVar []) . run run :: Chan (Maybe String) -> MVar [MVar ()] -> IO () run logCh statVars = do logV <- spawn1 readWriteLoop say "START" forM_ [18,17..10] $ spawn . busyWork await writeChan logCh Nothing -- poison the logger takeMVar logV putStrLn "DONE" where say mesg = force mesg >>= writeChan logCh . Just force s = mapM evaluate s -- works -- force s = return $ s `using` rdeepseq -- no difference -- force s = return s -- no-op; try this with strict channel busyWork = say . show . sum . filter odd . enumFromTo 2 . embiggen embiggen i = i*i*i*i*i readWriteLoop = readChan logCh >>= writeReadLoop writeReadLoop Nothing = return () writeReadLoop (Just mesg) = putStrLn mesg >> readWriteLoop spawn1 action = do v <- newEmptyMVar forkIO $ action `finally` putMVar v () return v spawn action = do v <- spawn1 action modifyMVar statVars $ \vs -> return (v:vs, ()) await = do vs <- modifyMVar statVars $ \vs -> return ([], vs) mapM_ takeMVar vs
Using most methods, the results are reported in the order generated; i.e. the longest calculation first. I interpret this as meaning that the output stream does all the work:
-- results in order spawned (longest-running first = broken) START 892616806655 503999185040 274877906943 144162977343 72313663743 34464808608 15479341055 6484436675 2499999999 DONE
I thought that the answer to this question would be strict channels , but they did not work. I understand that WHNF is not enough for strings, because it just forces an external constructor (zero or minus for the first character of the string). rdeepseq fully appreciated, but that doesn't make any difference. The only thing I found that works is to map Control.Exception.evaluate :: a -> IO a for all characters in the string. (See comments of the force function in the code for several different alternatives.) Here is the result with Control.Exception.evaluate :
-- results in order finished (shortest-running first = correct) START 2499999999 6484436675 15479341055 34464808608 72313663743 144162977343 274877906943 503999185040 892616806655 DONE
So why don't strict channels or rdeepseq produce this result? Are there other methods? I misunderstood why the first result is broken?