Eventloop design for multi-threaded data queue environment

Description of the problem I'm currently trying to find a lispy / working solution for:

The job queue provides a set of equal (by their code) threads with tasks that they must follow. If the queue is empty, the threads must wait until a new record is created, but I also want to ensure a clean shutdown. Therefore, even while waiting in line, it should be possible for the mother thread to set some variable / in order to invoke the threads and tell them to close. The only reason they won’t correspond directly should be because the thread is currently evaluating tasks and therefore busy / cannot complete a clean shutdown until the task is completed.

I currently have two solutions that I'm not sure about:

(defparameter *kill-yourself* nil) (defparameter *mutex* (sb-thread:make-mutex)) (defparameter *notify* (sb-thread:make-waitqueue)) #|the queue is thread safe|# (defparameter *job-queue* (make-instance 'queue)) (defun fill-queue (with-data) (fill-stuff-in-queue) (sb-thread:with-mutex (*mutex*) (sb-thread:condition-notify *notify*))) #|solution A|# (with-mutex (*mutex*) (do ((curr-job nil)) (*kill-yourself* nil) (if (is-empty *job-queue*) (sb-thread:condition-wait *notify* *mutex*) (progn (setf curr-job (dequeue *job-queue*)) (do-stuff-with-job))))) #|solution B|# (defun helper-kill-yourself-p () (sb-thread:with-mutex (*mutex*) *kill-yourself*)) (do ((job (dequeue-* *job-queue* :timeout 0) (dequeue-* *job-queue* :timeout 0))) ((if (helper-kill-yourself-p) t (sb-thread:with-mutex (*mutex*) (sb-thread:condition-wait *notify* *mutex*) (if (helper-kill-yourself-p) t nil))) (progn nil)) (do-stuff-with-job)) 

Both do-loops pools can be used to start threads. But A will not work if there are multiple threads (since the mutex will prevent any parallel action), and the solution B looks / is rather dirty, since there is a possibility of failures when the extracted task is zero. Moreover, I am not very sure about the state of the stop because it is too long and seems complicated.

What will be the correct way to implement a loop (do) that works with the data provided in the queue until it is supposed to, and can also sleep until there is new data, and should not be closed yet? And last, but not least, is the ability to use this loop (do) in an unlimited number of multiple parallel threads.

+4
source share
2 answers

Solution A

Yes, you are right in decision A, the mutex will not allow tasks to be performed in parallel.

Solution B

I think the do loop is not suitable for work. In particular, in your code there is a possibility that the task will be retrieved from the queue and the thread will exit without executing it. This situation is possible because you delete before you complete-complete the check. In addition, since you are defining job in do variables, block ignoring multiple values ​​returned from dequeue , which is also bad since you cannot effectively check if the queue is empty. Also in the scenario where you are checking whether to stop the stream in do end-test-form, you will need to purchase *mutex* twice to check whether the stream should stop and deactivate (or you can come up with a strange end-test form that will execute body cycle work).

So, having said that, you will need to put all the code inside the do body and leave vars and complete the test. That is why I think loop better in this case.

If you need to use a do loop, you can easily wrap a loop body in it, for example. (do nil (nil nil) *loop-body*) .

My decision

 (require :sb-concurrency) (use-package :sb-concurrency) (use-package :sb-thread) (defparameter *kill-yourself* nil) (defparameter *mutex* (make-mutex)) (defparameter *notify* (make-waitqueue)) #|the queue is thread safe|# (defparameter *job-queue* (make-queue :name "job-queue")) (defparameter *timeout* 10) (defparameter *output-lock* (make-mutex)) (defun output (line) (with-mutex (*output-lock*) (write-line line))) (defun fill-queue (with-data) (enqueue with-data *job-queue*) (with-mutex (*mutex*) (condition-notify *notify*))) (defun process-job (thread-name job) (funcall job thread-name)) (defun run-worker (name) (make-thread (lambda () (output (format nil "starting thread ~a" name)) (loop (with-mutex (*mutex*) (condition-wait *notify* *mutex* :timeout *timeout*) (when *kill-yourself* (output (format nil "~a thread quitting" name)) (return-from-thread nil))) ;; release *mutex* before starting the job, ;; otherwise it won't allow other threads wait for new jobs ;; you don't want to make 2 separate calls (queue-empty-p, dequeue) ;; since inbetween queue can become empty (multiple-value-bind (job has-job) (dequeue *job-queue*) (if has-job (process-job name job))))) :name name)) (defun stop-work () (with-mutex (*mutex*) (setf *kill-yourself* t) (condition-broadcast *notify*))) (defun add-job (job) ;; no need to put enqueue in critical section (enqueue job *job-queue*) (with-mutex (*mutex*) (condition-notify *notify*))) (defun make-job (n) (lambda (thread-name) (loop for i upto 1000 collecting i) (output (format nil "~a thread executes ~a job" thread-name n)))) (defun try-me () (run-worker "worker1") (run-worker "worker2") (loop for i upto 1000 do (add-job (make-job i))) (loop for i upto 2000 collecting i) (stop-work)) 

calling try-me in REPL should give you something like the following output

 starting thread worker1 worker1 thread executes 0 job worker1 thread executes 1 job worker1 thread executes 2 job worker1 thread executes 3 job starting thread worker2 worker2 thread executes 4 job worker1 thread executes 5 job worker2 thread executes 6 job worker1 thread executes 7 job worker1 thread executes 8 job ... worker2 thread executes 33 job worker1 thread executes 34 job worker2 thread executes 35 job worker1 thread executes 36 job worker1 thread executes 37 job worker2 thread executes 38 job 0 worker1 thread executes 39 job worker2 thread quitting worker1 thread quitting 

PS I could not find the documentation for the old SBCL, so I leave the translation to the old API up to you. Hope this helps.

Change class decision

In the comments on your (remote) answer, we found out that you need a class for the event loop. I come up with the following

 (defclass event-loop () ((lock :initform (make-mutex)) (queue :initform (make-waitqueue)) (jobs :initform (make-queue)) (stopped :initform nil) (timeout :initarg :wait-timeout :initform 0) (process-job :initarg :process-job :initform #'identity) (worker-count :initarg :worker-count :initform (error "Must supply worker count")))) (defmethod initialize-instance :after ((eloop event-loop) &key) (with-slots (worker-count timeout lock queue jobs process-job stopped) eloop (dotimes (i worker-count) (make-thread (lambda () (loop (with-mutex (lock) (condition-wait queue lock :timeout timeout) (when stopped (return-from-thread nil))) ;; release *mutex* before starting the job, ;; otherwise it won't allow other threads wait for new jobs ;; you don't want to make 2 separate calls (queue-empty-p, dequeue) ;; since inbetween queue can become empty (multiple-value-bind (job has-job) (dequeue jobs) (if has-job (funcall process-job job))))))))) (defun push-job (job event-loop ) (with-slots (lock queue jobs) event-loop (enqueue job jobs) (with-mutex (lock) (condition-notify queue)))) (defun stop-loop (event-loop) (with-slots (lock queue stopped) event-loop (with-mutex (lock) (setf stopped t) (condition-broadcast queue)))) 

You can use it like this:

 > (defparameter *el* (make-instance 'event-loop :worker-count 10 :process-job #'funcall)) > (defparameter *oq* (make-queue)) > (dotimes (i 100) (push-job (let ((ni)) (lambda () (sleep 1) (enqueue (format nil "~a job done" n) *oq*))) *el*)) 

sb-thread:queue used as the output to avoid weird results. While this works, you can learn *oq* in your REPL.

 > *oq* #S(QUEUE :HEAD (SB-CONCURRENCY::.DUMMY. "7 job done" "1 job done" "9 job done" "6 job done" "2 job done" "11 job done" "10 job done" "16 job done" "12 job done" "4 job done" "3 job done" "17 job done" "5 job done" "0 job done" "8 job done" "14 job done" "25 job done" "15 job done" "21 job done" "28 job done" "13 job done" "23 job done" "22 job done" "19 job done" "27 job done" "18 job done") :TAIL ("18 job done") :NAME NIL) 
+3
source

I used the chanl library, which provides a message queue mechanism. When I wanted the threads to close, I just sent the keyword :stop to the queue. Of course, this does not stop before everything that comes to :stop in the queue ends. If you want to stop earlier, you can make another queue (control queue), which is checked before the data queue.

0
source

Source: https://habr.com/ru/post/1501218/


All Articles