Converters should not be a problem for the batch processing function - as a receiver on the in channel, it will see the values converted by any converters on this channel, and any receiving signals listening out , in turn, will see the values converted by this channel converter.
As for implementation, the function below will take batches of max-count elements from in , or how many of them come with max-time , since the last batch was output, and output them to out , closing when the input channel is closed, provided that the input channel converter (if any, and any listeners listening out will also have this channel sensor, as described above):
(defn batch [in out max-time max-count] (let [lim-1 (dec max-count)] (async/go-loop [buf [] t (async/timeout max-time)] (let [[vp] (async/alts! [in t])] (cond (= pt) (do (async/>! out buf) (recur [] (async/timeout max-time))) (nil? v) (if (seq buf) (async/>! out buf)) (== (count buf) lim-1) (do (async/>! out (conj buf v)) (recur [] (async/timeout max-time))) :else (recur (conj buf v) t))))))
source share