How to send messages correctly using core.async?

I would like to send messages to core.async chan by count and timeout (i.e. 10 ms or 10 messages, whichever comes first). Tim Baldridge has a batch video , but uses legacy features in core.async and doesn't use converters. I'm looking for something like the following ...

(defn batch [in out max-time max-count] ... ) 
+5
source share
1 answer

Converters should not be a problem for the batch processing function - as a receiver on the in channel, it will see the values ​​converted by any converters on this channel, and any receiving signals listening out , in turn, will see the values ​​converted by this channel converter.

As for implementation, the function below will take batches of max-count elements from in , or how many of them come with max-time , since the last batch was output, and output them to out , closing when the input channel is closed, provided that the input channel converter (if any, and any listeners listening out will also have this channel sensor, as described above):

 (defn batch [in out max-time max-count] (let [lim-1 (dec max-count)] (async/go-loop [buf [] t (async/timeout max-time)] (let [[vp] (async/alts! [in t])] (cond (= pt) (do (async/>! out buf) (recur [] (async/timeout max-time))) (nil? v) (if (seq buf) (async/>! out buf)) (== (count buf) lim-1) (do (async/>! out (conj buf v)) (recur [] (async/timeout max-time))) :else (recur (conj buf v) t)))))) 
+11
source

Source: https://habr.com/ru/post/1235582/


All Articles