Tikhon's solution is the simplest, but it has one major drawback: it will not produce any results until the entire list is processed, and it will overflow if you process a list that is too large.
A solution closer to C # Rx would be to use a stream library like pipes .
For example, you can define a Producer that generates a String from user input:
import Control.Monad import Control.Proxy lines' :: (Proxy p) => () -> Producer p String IO r lines' () = runIdentityP $ forever $ do str <- lift getLine respond str
Then you can define a step that takes 10 lines:
take' :: (Monad m, Proxy p) => Int -> () -> Pipe paam () take' n () = runIdentityP $ replicateM_ n $ do a <- request () respond a
... and then the processing step:
proc :: (Monad m, Proxy p) => () -> Pipe p String (String, Int) mr proc () = runIdentityP $ forever $ do str <- request () respond (str, length str)
... and the final output stage:
print' :: (Proxy p, Show a) => () -> Consumer pa IO r print' () = runIdentityP $ forever $ do a <- request () lift $ print a
Now you can link them into a processing chain and run it:
main = runProxy $ lines' >-> take' 10 >-> proc >-> print'
... and it immediately gives the processed result after entering each line, and does not give the result as a batch at the end:
$ ./pipes Apple<Enter> ("Apple",5) Test<Enter> ("Test",4) 123<Enter> ("123",3) 4<Enter> ("4",1) 5<Enter> ("5",1) 6<Enter> ("6",1) 7<Enter> ("7",1) 8<Enter> ("8",1) 9<Enter> ("9",1) 10<Enter> ("10",2) $
In practice, you do not need to define these channels yourself. You can assemble the same chain of components in the standard pipes library:
>>> runProxy $ stdinS >-> takeB_ 10 >-> mapD (\x -> (x, length x)) >-> printD <exact same behavior>