How to determine the end of input using pipes

I am trying to read a group of 50 elements from a pipe and process them in an I / O action at the same time. (In this case, I am trying to insert data into the database, and I want to make a whole batch inside a single transaction, because it is much more efficient). Here is a simplified version of what I still have:

type ExampleType = Int doSomething :: [ExampleType] -> IO () doSomething = undefined inGroupsOf50 :: Monad m => Producer ExampleType m () -> m () inGroupsOf50 input = runEffect $ input >-> loop where loop = do entries <- replicateM 50 await lift $ doSomething entries --Insert a bunch all in one transaction loop 

The problem is, as far as I can tell, if the number of elements to be inserted is divided by 50, I will skip some. What I really want instead of replicateM 50 await is that it gives me up to 50 elements or less if the input ends, but I cannot figure out how to write it.

I thought pipes-parse might be the right library to view. draw has a promising signature ... but so far all the bits don't fit together in my head. I have a producer , I write consumer , and I really don’t understand how this relates to the concept of parser .

+5
source share
1 answer

Even more pipes-parse you can take a look at pipes-group . In particular, we consider the function

 -- this type is slightly specialized chunksOf :: Monad m => Int -> Lens' (Producer amx) (FreeT (Producer am) mx) 

Lens' bit can be intimidating, but can be quickly eliminated: it claims that we can convert Producer amx to FreeT (Producer am) mx [0]

 import Control.Lens (view) chunkIt :: Monad m => Int -> Producer amx -> FreeT (Producer am) mx chunkIt n = view (chunksOf n) 

So now we need to figure out what to do with this FreeT bit. In particular, we will want to look into the free package and pull out the iterT function

 iterT :: (Functor f, Monad m) => (f (ma) -> ma) -> (FreeT fma -> ma) 

This function, iterT , will “consume” a FreeT one “step” at a time. To understand this, we will first specialize the iterT type to replace f with Producer am

 runChunk :: Monad m => (Producer am (mx) -> mx) -> (FreeT (Producer am) mx -> mx) runChunk = iterT 

In particular, runChunk can "run" a FreeT full Producer , while we talk about how to convert a Producer am (mx) to m -action. This may seem more familiar. When we define the first argument of runChunk , we just need to execute Producer , which in this case will contain no more than the selected number of elements.

But what about the spectacular return value mx ? This is a "sequel", for example. all the pieces that come after the current one you are writing. For example, suppose we have a Producer from Char , and we would like to print and split the line after three characters

 main :: IO () main = flip runChunk (chunkIt 3 input) $ \p -> _ 

The hole _ at this point is of type IO () with p in context in type p :: Producer Char IO (IO ()) . We can use this channel with for , collect its return type (which is a continuation, again), emit a new line, and then start the continuation.

 input :: Monad m => Producer Char m () input = each "abcdefghijklmnopqrstuvwxyz" main :: IO () main = flip runChunk (chunkIt 3 input) $ \p -> do cont <- runEffect $ for p (lift . putChar) putChar '\n' cont 

And it behaves exactly as we would like

 λ> main abc def ghi jkl mno pqr stu vwx yz 

To be clear, while I made a little presentation, this is pretty simple code, as soon as you see how all the parts match. Here is a list of everything:

 input :: Monad m => Producer Char m () input = each "abcdefghijklmnopqrstuvwxyz" main :: IO () main = flip iterT (input ^. chunksOf 3) $ \p -> do cont <- runEffect $ for p $ \c -> do lift (putChar c) putChar '\n' cont 

[0] Also a bit more, but that’s enough at the moment.

+11
source

Source: https://habr.com/ru/post/1210060/