What is the correct way to perform side effects when replacing a clojure atom

I keep a registry of processes in the atom.

I want to start one and only one process (in particular, core.async go-loop) per id.

However, you should not perform side effects in swap!, so this code is not suitable:

(swap! processes-atom
       (fn [processes]
         (if (get processes id)
           processes ;; already exists, do nothing
           (assoc processes id (create-process! id)))))

How will I do it right?

I looked lockingthat takes an object as a monitor for locking. I would prefer that everyone id- dynamic - have their own castle.

+5
source share
5 answers

locking, OlegTheCat, . , , , , , : , pid , pid . , , , swap!. , , , , , , , .

, vars, , as-is:

(def process-results (atom []))
(defn create-process! [id]
  ;; pretend creating the process takes a long time
  (Thread/sleep (* 1000 (rand-int 3)))
  (future
    ;; running it takes longer, but happens on a new thread
    (Thread/sleep (* 1000 (rand-int 10)))
    (swap! process-results conj id)))

(def processes-atom (atom {}))
(defn cached-process [id]
  (-> processes-atom
      (swap! (fn [processes]
               (update processes id #(or % (delay (create-process! id))))))
      (get id)
      (deref)))

, cached-process, . , , :

(defn stress-test [num-processes]
  (reset! process-results [])
  (reset! processes-atom {})
  (let [running-processes (doall (for [i (range num-processes)]
                                   (cached-process (rand-int 10))))]
    (run! deref running-processes)
    (deref process-results)))

user> (time (stress-test 40))
"Elapsed time: 18004.617869 msecs"
[1 5 2 0 9 7 8 4 3 6]
+2

, processes-atom , . locking . , locking, , volatile atom (volatile , ).

, - :

(def processes-volatile (volatile! {}))

(defn create-and-save-process! [id]
  (locking processes-volatile
    (vswap! processes-volatile
            (fn [processes]
              (if (get processes id)
                processes
                (assoc processes id (create-process! id)))))))
+3

- agent . "-":

(defn start-proc-agent
  [state]
  (let [delay (int (* 2000 (rand)))]
    (println (format "starting %d" (:id state)))
    (Thread/sleep delay)
    (println (format "finished %d" (:id state)))
    (merge state {:delay delay :state :running} )))

(def procs-agent (atom {}))
(dotimes [i 3]
  (let [curr-agent (agent {:id i :state :unstarted})]
    (swap! procs-agent assoc i curr-agent)
    (send curr-agent start-proc-agent )))
(println "all dispatched...")
(pprint @procs-agent)

(Thread/sleep 3000)
(pprint @procs-agent)

:

starting 2
starting 1
starting 0
all dispatched...
{0 #<Agent@39d8240b: {:id 0, :state :unstarted}>,
 1 #<Agent@3a6732bc: {:id 1, :state :unstarted}>,
 2 #<Agent@7414167a: {:id 2, :state :unstarted}>}
finished 0
finished 1
finished 2
{0 #<Agent@39d8240b: {:id 0, :state :running, :delay 317}>,
 1 #<Agent@3a6732bc: {:id 1, :state :running, :delay 1635}>,
 2 #<Agent@7414167a: {:id 2, :state :running, :delay 1687}>}

, procs- . , ( ) , ( ) .


, ( ), . , , , swap!. , . :

(defn start-proc-once-only
  [state i]
  (let [curr-proc (get state i) ]
    (if (= :running (:state curr-proc))
      (do
        (println "skipping restart of" i)
        state)
      (let [delay (int (* 2000 (rand)))]
        (println (format "starting %d" i))
        (Thread/sleep delay)
        (println (format "finished %d" i))
        (assoc state i {:delay delay :state :running})))))

(def procs (agent {}))
(dotimes [i 3]
  (println :starting i)
  (send procs start-proc-once-only i))
(dotimes [i 3]
  (println :starting i)
  (send procs start-proc-once-only i))

(println "all dispatched...")
(println :procs) (pprint @procs)
(Thread/sleep 5000)
(println :procs) (pprint @procs)

:starting 0
:starting 1
:starting 2
starting 0
:starting 0
:starting 1
:starting 2
all dispatched...
:procs
{}
finished 0
starting 1
finished 1
starting 2
finished 2
skipping restart of 0
skipping restart of 1
skipping restart of 2
:procs
{0 {:delay 1970, :state :running},
 1 {:delay 189, :state :running},
 2 {:delay 1337, :state :running}}
0

I think you should use add-watch . He receives once for changing the atom. In watch-fn, check if a new identifier for the atom has been added, if so, create a process and add it to the atom. This will call another call to the watch-fn function, but the second call will not identify a new identifier that requires a process.

0
source

I prefer to use the channel

(defn create-process! [id] {:id id})

(def ^:private processes-channel (chan))

(go (loop [processes {}]
    (let [id (<! processes-channel)
          process (if (contains? processes id)
                    (get processes id)
                    (create-process! id))]
      (>! processes-channel process)
      (recur (assoc processes id process)))))

(defn get-process-by-id
  "Public API"
  [id]
  (>!! processes-channel id)
  (<!! processes-channel))
0
source

Source: https://habr.com/ru/post/1665995/


All Articles