Strict parallel channel estimation methods in Haskell

I work with Haskell streams, and I am faced with the problem of passing lazily evaluated values ​​across a channel. For example, with N workflows and 1 output stream, workers report invaluable work, and the output stream completes the work for them.

I read about this problem in various documents and saw various solutions, but I found only one solution that works and the rest do not. Below is the code in which workflows start some calculations that can take a lot of time. I start threads in descending order, so that the first thread should take the longest, and later threads should end earlier.

import Control.Concurrent (forkIO) import Control.Concurrent.Chan -- .Strict import Control.Concurrent.MVar import Control.Exception (finally, evaluate) import Control.Monad (forM_) import Control.Parallel.Strategies (using, rdeepseq) main = (>>=) newChan $ (>>=) (newMVar []) . run run :: Chan (Maybe String) -> MVar [MVar ()] -> IO () run logCh statVars = do logV <- spawn1 readWriteLoop say "START" forM_ [18,17..10] $ spawn . busyWork await writeChan logCh Nothing -- poison the logger takeMVar logV putStrLn "DONE" where say mesg = force mesg >>= writeChan logCh . Just force s = mapM evaluate s -- works -- force s = return $ s `using` rdeepseq -- no difference -- force s = return s -- no-op; try this with strict channel busyWork = say . show . sum . filter odd . enumFromTo 2 . embiggen embiggen i = i*i*i*i*i readWriteLoop = readChan logCh >>= writeReadLoop writeReadLoop Nothing = return () writeReadLoop (Just mesg) = putStrLn mesg >> readWriteLoop spawn1 action = do v <- newEmptyMVar forkIO $ action `finally` putMVar v () return v spawn action = do v <- spawn1 action modifyMVar statVars $ \vs -> return (v:vs, ()) await = do vs <- modifyMVar statVars $ \vs -> return ([], vs) mapM_ takeMVar vs 

Using most methods, the results are reported in the order generated; i.e. the longest calculation first. I interpret this as meaning that the output stream does all the work:

 -- results in order spawned (longest-running first = broken) START 892616806655 503999185040 274877906943 144162977343 72313663743 34464808608 15479341055 6484436675 2499999999 DONE 

I thought that the answer to this question would be strict channels , but they did not work. I understand that WHNF is not enough for strings, because it just forces an external constructor (zero or minus for the first character of the string). rdeepseq fully appreciated, but that doesn't make any difference. The only thing I found that works is to map Control.Exception.evaluate :: a -> IO a for all characters in the string. (See comments of the force function in the code for several different alternatives.) Here is the result with Control.Exception.evaluate :

 -- results in order finished (shortest-running first = correct) START 2499999999 6484436675 15479341055 34464808608 72313663743 144162977343 274877906943 503999185040 892616806655 DONE 

So why don't strict channels or rdeepseq produce this result? Are there other methods? I misunderstood why the first result is broken?

+4
source share
1 answer

There are two questions here.

The reason the first attempt (using explicit rnf ) does not work is because with the help of return you created a thunk that fully evaluates itself when evaluating it, but thunk itself has not been evaluated. Note that the evaluation type is a -> IO a : the fact that it returns a value in IO means that evaluate can do the evaluate :

 return (error "foo") >> return 1 == return 1 evaluate (error "foo") >> return 1 == error "foo" 

The result is that this code:

 force s = evaluate $ s `using` rdeepseq 

will work (as in, it has the same behavior as mapM_ evaluate s ).


The case of using strict channels is a little more complicated, but I believe that this is due to an error in strict concurrency. Expensive calculations are actually performed on worker threads, but that doesn't do you much good (you can check this explicitly by hiding some asynchronous exceptions in your lines and seeing which thread is on the exception surface).

What mistake? Let's look at the code for strict writeChan :

 writeChan :: NFData a => Chan a -> a -> IO () writeChan (Chan _read write) val = do new_hole <- newEmptyMVar modifyMVar_ write $ \old_hole -> do putMVar old_hole $! ChItem val new_hole return new_hole 

We see that modifyMVar_ is invoked on write before we evaluate thunk. The sequence of operations:

  • writeChan
  • We takeMVar write (block everyone who wants to write to the channel)
  • We value an expensive thunk
  • We put dear thunk on the canal
  • We putMVar write , unlock all other threads

You do not see this behavior with evaluate options because they evaluate before a lock is obtained.

Email this to Don mail and see if he agrees that this behavior is not optimal.

Don agrees that this behavior is suboptimal. We are working on a patch.

+5
source

Source: https://habr.com/ru/post/1342524/


All Articles