I ended up with channels. The generic processFeed function is used as a receiver, and then data from postUrlSource or Data.Conduit.Binary.sourceFile is inserted into it, depending on the mode.
import Data.Conduit.Binary as CB(sourceFile, conduitFile, lines) processFeed :: MonadIO m => Config -> OwnerId -> (OwnerId -> [Post] -> IO ()) -> Sink BS.ByteString m FetchResult processFeed config ownerId' processFn = do ... postUrlSource :: MonadIO m => Config -> OwnerId -> Source (StateT FetchState (m)) BS.ByteString postUrlSource config ownerId' = do ... ... _ <- case (dsMode config) of DSFromFile -> do runResourceT $ CB.sourceFile dumpFile $= CB.lines $$ (processFeed config publicId' saveResult) DSNormal -> do let postsFromUrlConduit = (postUrlSource config publicId') $$ (processFeed config publicId' saveResult) fetchedPosts <- runStateT postsFromUrlConduit (FetchState 0 "") return $ fst fetchedPosts ...
StateT is used for the case when we retrieve data from a URL, so each piece is retrieved with a new offset. To read from a file, this is IO monad, it just reads the lines sequentially from the dump.
source share