There are two solutions based on pipes , and I will let you choose which one you prefer.
Note. It is not clear why you list downstream on the interface, and not just return it directly as a result.
Conduit style
The first, which is very close to a conduit based solution, uses the upcoming pipes-pase , which is mostly complete and requires documentation. You can find the latest project on Github.
Using pipes-parse , the solution is identical to the conduit solution that Peter gave:
import Control.Proxy import Control.Proxy.Parse combine :: (Monad m, Proxy p) => () -> Pipe (StateP [Maybe a] p) (Maybe a) [a] m () combine () = loop [] where loop as = do ma <- draw case ma of Nothing -> respond (reverse as) Just a -> loop (a:as)
draw is similar to conduit await : it requests a value from the remainder buffer (this is part of StateP ) or from the upstream if the buffer is empty. Nothing indicates the end of the file.
You can wrap a pipe that does not have a file end using the pipes-parse wrap function, which is of type:
wrap :: (Monad m, Proxy p) => pa' ab' bmr -> pa' ab' (Maybe b) ms
Classic Pipe Style
The second option is a bit simpler. If you want to reset a given channel, you can do it directly using WriterP :
import Control.Proxy import Control.Proxy.Trans.Writer foldIt :: (Monad m, Proxy p) => (() -> Pipe pabm ()) -> () -> Pipe pa [b] m () foldIt p () = runIdentityP $ do r <- execWriterK (liftP . p >-> toListD >-> unitU) () respond r
This is a more detailed description of what is happening, but it requires passing in the pipe as an explicit argument. It is up to you who you prefer.
By the way, that’s why I asked why you want to send one value downstream. The above is much simpler if you return a folded list:
foldIt p = execWriterK (liftP . p >-> toListD)
liftP might not even be needed if p is completely polymorphic in its proxy type. I only include it as a precaution.
Bonus Solution
The reason pipes-parse does not stipulate that toOneBigList is that it always contains an anti-pattern for grouping the results into a list. pipes has some nice features that allow you to never group input into a list, even if you are trying to provide multiple lists. For example, using the respond composition, you might have a proxy server that gives a subset of the thread that it would go through, and then inserts a handler that uses this subset:
example :: (Monad m, Proxy p) => () -> Pipe pa (() -> Pipe paam ()) mr example () = runIdentityP $ forever $ do respond $ \() -> runIdentityP $ replicateM_ 3 $ request () >>= respond printIt :: (Proxy p, Show a) => () -> Pipe paa IO r printIt () = runIdentityP $ do lift $ putStrLn "Here we go!" printD () useIt :: (Proxy p, Show a) => () -> Pipe paa IO r useIt = example />/ (\p -> (p >-> printIt) ())
Here is an example of how to use it:
>>> runProxy $ enumFromToS 1 10 >-> useIt Here we go! 1 2 3 Here we go! 4 5 6 Here we go! 7 8 9 Here we go! 10
This means that you never need to enter one element into memory, even if you need to group elements.