Clojure: alternative to using mutex / lock and counter

Scenario: I have a server that listens on six active TCP / IP connections. When the message "ready" arrives, the event will be created in its stream. When the server received a "ready" message from each connection, it needs to start the "start" function.

My object-oriented solution is likely to be related to using a mutex and a counter. Sort of:

int _countDown= 6;
object _lock;
void ReadyMessageReceivedForTheFirstTimeFromAConnection() {
    lock(_lock) {
      --_countDown; // 
       if (_countDown==0) Start();
    }
}

How to solve this problem in Clojure without resorting to locks / mutexes?

+4
source share
3 answers

clojure, , .

, , , : .

(def wait-barrier (promise))
(def conn-count (atom 0))

(add-watch conn-count :barrier-watch
           (fn [key ref old-state new-state]
             (when (== new-state 6)
               (deliver wait-barrier :go))))  

-:

(def wait-barrier (promise))
(def conn-count (atom 0))
(defn worker-dummy []
  (when (= @wait-barrier :go)
    (println "I'm a worker")))

(defn dummy-receive-msg []
  (doall (repeatedly 6,
                     (fn []
                       (println "received msg")
                       (swap! conn-count inc)))))

(let [workers (doall (repeatedly 6 (fn [] (future (worker-dummy)))))]
  (add-watch conn-count :barrier-watch
             (fn [key ref old-state new-state]
               (when (== new-state 6)
                 (deliver wait-barrier :go))))
  (dummy-receive-msg)
  (doall (map deref workers)))
+5

CountDownLatch Phaser .

, , . CountDownLatch, Phaser ForkJoin ( ). diff. , .

:

(let [latch (CountDownLatch. 6)]
  (on-receive-message this (fn [_] (.countDown latch)))
  (.await latch)

... - .

+6

: core.async. MCVE:

(let [conn-count 6
      ready-chan (chan)]

  ;; Spawn a thread for each connection.
  (doseq [conn-id (range conn-count)]
    (thread
      (Thread/sleep (rand-int 2000))
      (>!! ready-chan conn-id)))

  ;; Block until all connections are established.
  (doseq [total (range 1 (inc conn-count))]
    (println (<!! ready-chan) "connected," total "overall"))

  ;; Invoke start afterwards.
  (println "start"))
;; 5 connected, 1 overall
;; 3 connected, 2 overall
;; 4 connected, 3 overall
;; 0 connected, 4 overall
;; 1 connected, 5 overall
;; 2 connected, 6 overall
;; start
;;=> nil

( Christophe Grand):

(defn count-down-latch-chan [n]
  (chan 1 (comp (drop (dec n)) (take 1))))

For a brief introduction to core.async, check out this Gist . For a longer one, read the corresponding chapter in the section "Clojure for the brave and true."

+1
source

Source: https://habr.com/ru/post/1628400/


All Articles