I have a database server and I am extracting data from it. Sometimes data has millions of rows or more, so I use laziness to load it. I use server cursors from clojure.jdbc library https://funcool.imtqy.com/clojure.jdbc/latest/#cursor-queries to get data lazily.
Now I have a problem. I need to produce the initial 500 elements from a lazy sequence, then the program should wait 10 minutes to get a signal that tells the program about the next 500 elements and so on until I get all the data from the server. But if the report did not arrive within 10 minutes, the program should close the connection.
I wrote a sample:
(def lazyseq_maps (atom {:seq_1 {:next_500 false :data nil} :seq_2 {:next_500 false :data nil}})) ; here is a collection of all unfinished lazy sequences waiting for signal to continue produce elements
(jdbc/atomic conn
(with-open [cursor (jdbc/fetch-lazy conn sql]
(let [lazyseq (jdbc/cursor->lazyseq cursor)]
(swap! lazyseq_maps assoc seq_id {:next_500 true :data nil})
(loop [lazyseq_rest lazyseq
count 1]
(if (:next_500 (seq_id @lazyseq_maps))
(do
(swap! lazyseq_maps update-in [seq_id :data] conj (first lazyseq_rest))
(when (= 0 (mod count 500))
(swap! lazyseq_maps assoc-in [seq_id :next_500] false))
(recur (rest lazyseq) (inc count)))
;
(func-for-waiting-signal)))) ; here I don`t know how to create function waiting signal to continue fetching data
(seq_id @lazyseq_maps)))
, clojure ? , core.async . ? , 10 , ?