As for (1), the general solution is to change the Pipe type to:
Pipe (Either (Context, Int) Foo) Bar IO ()
In other words, it accepts both Foo inputs and tune requests, which it processes internally.
So, let's assume that you have two parallel Producer , corresponding to the inputs and settings:
producer1 :: Producer Foo IO () producer2 :: Producer (Context, Int) IO ()
You can use pipes-concurrency to create a buffer that they both enter, for example:
example = do (output, input) <- spawn Unbounded -- input :: Input (Either (Context, Int) Foo) -- output :: Output (Either (Context, Int) Foo) let io1 = runEffect $ producer1 >-> Pipes.Prelude.map Right >-> toOutput output io2 = runEffect $ producer2 >-> Pipes.Prelude.map Left >-> toOutput output as <- mapM async [io1, io2] runEffect (fromInput >-> yourPipe >-> someConsumer) mapM_ wait as
You can learn more about the pipes-concurrency library by reading this tutorial .
By making all tuning requests go through the same single-threaded Pipe , you can make sure that you do not have two simultaneous calls to the tune function.
As for (2), you can get the resource in two ways using pipes . A more sophisticated approach is to use the pipes-safe library, which provides a bracket function that you can use in Pipe , but this is probably too large for your purpose and exists only to get and release a few resources over the life of the pipe. A simpler solution is to use the following with idiom to get the channel:
withEncoder :: (Pipe Foo Bar IO () -> IO r) -> IO r withEncoder k = bracket acquire release $ \resource -> do k (createPipeFromResource resource)
Then the user simply writes:
withEncoder $ \yourPipe -> do runEffect (someProducer >-> yourPipe >-> someConsumer)
You can additionally use the managed package, which simplifies the types a bit and simplifies getting multiple resources. You can learn more about this by reading this blog post .