Solution A
Yes, you are right in decision A, the mutex will not allow tasks to be performed in parallel.
Solution B
I think the do
loop is not suitable for work. In particular, in your code there is a possibility that the task will be retrieved from the queue and the thread will exit without executing it. This situation is possible because you delete before you complete-complete the check. In addition, since you are defining job
in do
variables, block ignoring multiple values ββreturned from dequeue
, which is also bad since you cannot effectively check if the queue is empty. Also in the scenario where you are checking whether to stop the stream in do
end-test-form, you will need to purchase *mutex*
twice to check whether the stream should stop and deactivate (or you can come up with a strange end-test form that will execute body cycle work).
So, having said that, you will need to put all the code inside the do
body and leave vars and complete the test. That is why I think loop
better in this case.
If you need to use a do
loop, you can easily wrap a loop
body in it, for example. (do nil (nil nil) *loop-body*)
.
My decision
(require :sb-concurrency) (use-package :sb-concurrency) (use-package :sb-thread) (defparameter *kill-yourself* nil) (defparameter *mutex* (make-mutex)) (defparameter *notify* (make-waitqueue)) #|the queue is thread safe|# (defparameter *job-queue* (make-queue :name "job-queue")) (defparameter *timeout* 10) (defparameter *output-lock* (make-mutex)) (defun output (line) (with-mutex (*output-lock*) (write-line line))) (defun fill-queue (with-data) (enqueue with-data *job-queue*) (with-mutex (*mutex*) (condition-notify *notify*))) (defun process-job (thread-name job) (funcall job thread-name)) (defun run-worker (name) (make-thread (lambda () (output (format nil "starting thread ~a" name)) (loop (with-mutex (*mutex*) (condition-wait *notify* *mutex* :timeout *timeout*) (when *kill-yourself* (output (format nil "~a thread quitting" name)) (return-from-thread nil))) ;; release *mutex* before starting the job, ;; otherwise it won't allow other threads wait for new jobs ;; you don't want to make 2 separate calls (queue-empty-p, dequeue) ;; since inbetween queue can become empty (multiple-value-bind (job has-job) (dequeue *job-queue*) (if has-job (process-job name job))))) :name name)) (defun stop-work () (with-mutex (*mutex*) (setf *kill-yourself* t) (condition-broadcast *notify*))) (defun add-job (job) ;; no need to put enqueue in critical section (enqueue job *job-queue*) (with-mutex (*mutex*) (condition-notify *notify*))) (defun make-job (n) (lambda (thread-name) (loop for i upto 1000 collecting i) (output (format nil "~a thread executes ~a job" thread-name n)))) (defun try-me () (run-worker "worker1") (run-worker "worker2") (loop for i upto 1000 do (add-job (make-job i))) (loop for i upto 2000 collecting i) (stop-work))
calling try-me
in REPL should give you something like the following output
starting thread worker1 worker1 thread executes 0 job worker1 thread executes 1 job worker1 thread executes 2 job worker1 thread executes 3 job starting thread worker2 worker2 thread executes 4 job worker1 thread executes 5 job worker2 thread executes 6 job worker1 thread executes 7 job worker1 thread executes 8 job ... worker2 thread executes 33 job worker1 thread executes 34 job worker2 thread executes 35 job worker1 thread executes 36 job worker1 thread executes 37 job worker2 thread executes 38 job 0 worker1 thread executes 39 job worker2 thread quitting worker1 thread quitting
PS I could not find the documentation for the old SBCL, so I leave the translation to the old API up to you. Hope this helps.
Change class decision
In the comments on your (remote) answer, we found out that you need a class for the event loop. I come up with the following
(defclass event-loop () ((lock :initform (make-mutex)) (queue :initform (make-waitqueue)) (jobs :initform (make-queue)) (stopped :initform nil) (timeout :initarg :wait-timeout :initform 0) (process-job :initarg :process-job :initform
You can use it like this:
> (defparameter *el* (make-instance 'event-loop :worker-count 10 :process-job #'funcall)) > (defparameter *oq* (make-queue)) > (dotimes (i 100) (push-job (let ((ni)) (lambda () (sleep 1) (enqueue (format nil "~a job done" n) *oq*))) *el*))
sb-thread:queue
used as the output to avoid weird results. While this works, you can learn *oq*
in your REPL.
> *oq* #S(QUEUE :HEAD (SB-CONCURRENCY::.DUMMY. "7 job done" "1 job done" "9 job done" "6 job done" "2 job done" "11 job done" "10 job done" "16 job done" "12 job done" "4 job done" "3 job done" "17 job done" "5 job done" "0 job done" "8 job done" "14 job done" "25 job done" "15 job done" "21 job done" "28 job done" "13 job done" "23 job done" "22 job done" "19 job done" "27 job done" "18 job done") :TAIL ("18 job done") :NAME NIL)