I have a simple procedure that takes a product of a vector Double
. I am trying to parallelize this code, but many of the sparks eventually run out. Here is an autonomous benchmark, which is also represented as an entity :
{-
{-
{-
import Criterion.Main
import Control.Monad (when)
import Control.Parallel.Strategies (runEval,rpar,rseq)
import qualified Data.Vector.Primitive as PV
main :: IO ()
main = do
let expected = PV.product numbers
when (not (serialProduct numbers == expected)) $ do
fail "serialProduct implementation incorrect"
defaultMain
[ bgroup "product"
[ bench "serial" $ whnf serialProduct numbers
, bench "parallel" $ whnf parallelProduct numbers
]
]
numbers :: PV.Vector Double
numbers = PV.replicate 10000000 1.00000001
{-
serialProduct :: PV.Vector Double -> Double
serialProduct v =
let !len = PV.length v
go :: Double -> Int -> Double
go !d !ix = if ix < len then go (d * PV.unsafeIndex v ix) (ix + 1) else d
in go 1.0 0
-- | This only works when the vector length is a multiple of 8.
parallelProduct :: PV.Vector Double -> Double
parallelProduct v = runEval $ do
let chunk = div (PV.length v) 8
p2 <- rpar (serialProduct (PV.slice (chunk * 6) chunk v))
p3 <- rpar (serialProduct (PV.slice (chunk * 7) chunk v))
p1 <- rseq (serialProduct (PV.slice (chunk * 0) (chunk * 6) v))
return (p1 * p2 * p3)
This can be created and run with:
ghc -threaded parallel_compute.hs
./parallel_compute +RTS -N4 -s
I have an octa-core box, so providing four functions should be great. Test results are not very important, but here they are:
benchmarking product/serial
time 11.40 ms (11.30 ms .. 11.53 ms)
0.999 R² (0.998 R² .. 1.000 R²)
mean 11.43 ms (11.37 ms .. 11.50 ms)
std dev 167.2 μs (120.4 μs .. 210.1 μs)
benchmarking product/parallel
time 10.03 ms (9.949 ms .. 10.15 ms)
0.999 R² (0.999 R² .. 1.000 R²)
mean 10.17 ms (10.11 ms .. 10.31 ms)
std dev 235.7 μs (133.4 μs .. 426.2 μs)
Now, runtime statistics. Here I am confused:
124,508,840 bytes allocated in the heap
529,843,176 bytes copied during GC
80,232,008 bytes maximum residency (8344 sample(s))
901,272 bytes maximum slop
83 MB total memory in use (0 MB lost due to fragmentation)
Tot time (elapsed) Avg pause Max pause
Gen 0 19 colls, 19 par 0.008s 0.001s 0.0001s 0.0003s
Gen 1 8344 colls, 8343 par 2.916s 1.388s 0.0002s 0.0008s
Parallel GC work balance: 76.45% (serial 0%, perfect 100%)
TASKS: 13 (1 bound, 12 peak workers (12 total), using -N4)
SPARKS: 1024 (502 converted, 0 overflowed, 0 dud, 28 GC'd, 494 fizzled)
INIT time 0.000s ( 0.002s elapsed)
MUT time 11.480s ( 10.414s elapsed)
GC time 2.924s ( 1.389s elapsed)
EXIT time 0.004s ( 0.005s elapsed)
Total time 14.408s ( 11.811s elapsed)
Alloc rate 10,845,717 bytes per MUT second
Productivity 79.7% of total user, 88.2% of total elapsed
, , , . . parallelProduct
6 , , . , , ( GCed). . , , , , .
( ) , . , -, GHC, , , , GHC serialProduct
.
11% . , , , . , , .
.
gist, :
-- | This only works when the vector length is a multiple of 4.
parallelProductFork :: PV.Vector Double -> Double
parallelProductFork v = unsafePerformIO $ do
let chunk = div (PV.length v) 4
var <- newEmptyMVar
_ <- forkIO $ evaluate (serialProduct (PV.slice (chunk * 0) chunk v)) >>= putMVar var
_ <- forkIO $ evaluate (serialProduct (PV.slice (chunk * 1) chunk v)) >>= putMVar var
_ <- forkIO $ evaluate (serialProduct (PV.slice (chunk * 2) chunk v)) >>= putMVar var
_ <- forkIO $ evaluate (serialProduct (PV.slice (chunk * 3) chunk v)) >>= putMVar var
a <- takeMVar var
b <- takeMVar var
c <- takeMVar var
d <- takeMVar var
return (a * b * c * d)
:
benchmarking product/parallel mvar
time 3.814 ms (3.669 ms .. 3.946 ms)
0.986 R² (0.977 R² .. 0.992 R²)
mean 3.818 ms (3.708 ms .. 3.964 ms)
std dev 385.6 μs (317.1 μs .. 439.8 μs)
variance introduced by outliers: 64% (severely inflated)
concurrency . , , .