Is this an idiomatic workflow pool in Go?

I am trying to write a simple working pool with goroutines.

  • Is the code I wrote idiomatic? If not, what should change?
  • I want to set the maximum number of workflows to 5 and block until the employee becomes available if all 5 are busy. How can I extend this to have only 5 workers? Do I create static 5 goroutines and give everyone a work_channel ?

code:

 package main import ( "fmt" "math/rand" "sync" "time" ) func worker(id string, work string, o chan string, wg *sync.WaitGroup) { defer wg.Done() sleepMs := rand.Intn(1000) fmt.Printf("worker '%s' received: '%s', sleep %dms\n", id, work, sleepMs) time.Sleep(time.Duration(sleepMs) * time.Millisecond) o <- work + fmt.Sprintf("-%dms", sleepMs) } func main() { var work_channel = make(chan string) var results_channel = make(chan string) // create goroutine per item in work_channel go func() { var c = 0 var wg sync.WaitGroup for work := range work_channel { wg.Add(1) go worker(fmt.Sprintf("%d", c), work, results_channel, &wg) c++ } wg.Wait() fmt.Println("closing results channel") close(results_channel) }() // add work to the work_channel go func() { for c := 'a'; c < 'z'; c++ { work_channel <- fmt.Sprintf("%c", c) } close(work_channel) fmt.Println("sent work to work_channel") }() for x := range results_channel { fmt.Printf("result: %s\n", x) } } 
+4
source share
2 answers

Your solution is not a pool of working goroutines in any sense: your code does not limit concurrent goroutines, and it does not "reuse" goroutines (it always starts a new one when it receives a new job).

Manufacturer-Consumer Model

As I wrote in Bruteforce MD5 Password cracker , you can use the manufacturer-consumer template . You may have a designated goroutine manufacturer who will generate tasks (things to do / calculate) and send them to the tasks channel. You can have a fixed pool of consumer goroutines (for example, 5 of them), which will cross the channel on which tasks are performed, and each of them will perform / complete the received tasks.

The goroutine producer can simply close the jobs channel when all jobs have been generated and sent, which correctly signals consumers that there will be no more jobs. The for ... range construct on the channel handles the close event and completes correctly. Please note that all tasks sent before the channel closes will be delivered.

This will result in a clean design, will result in a fixed (but arbitrary) number of goroutines, and it will always use 100% CPU (if # goroutines are larger than # CPU cores). It also has the advantage that it can be throttled with the right choice of channel bandwidth (buffer channel) and the number of consumer routers.

Please note that this model is optional for the designated goroutine manufacturer. You can have several goroutines for creating tasks, but then you should also synchronize them to close only the jobs channel when all producer goroutines are done with creating tasks, otherwise try to send another task in the jobs channel when it was already closed as a result panic attacks. Usually the production of jobs is cheap and they can be produced much faster than they can be done, therefore this model for their production is 1 goroutine, while many of them consume / fulfill them, in practice they are good.

Processing Results:

If the tasks have results, you can choose the assigned result channel through which the results can be delivered ("sent back"), or you can decide to process the results with the consumer when the task is completed / completed. This last can be implemented using the "callback" function, which processes the results. The important thing is whether the results can be processed independently or whether they need to be combined (for example, a framework with a map reduction) or aggregated.

If you go with the results channel, you also need a goroutine that receives values ​​from it, not allowing users to block (this will happen if the results buffer is full).

With results channel

Instead of sending simple string values ​​as tasks and results, I would create a shell type that can contain any additional information, and therefore it is much more flexible:

 type Job struct { Id int Work string Result string } 

Note that the Job structure also carries the result, so when we submit the result, it also contains the original Job as a context - often very useful. Also note that it is beneficial to simply send pointers ( *Job ) in channels instead of Job values, so you don't need to make "countless" copies of Job s, and the size of the value of the Job structure becomes inappropriate.

Here is what this consumer-producer looks like:

I would use 2 sync.WaitGroup values, their role would follow:

 var wg, wg2 sync.WaitGroup 

The manufacturer is responsible for creating tasks that will be performed:

 func produce(jobs chan<- *Job) { // Generate jobs: id := 0 for c := 'a'; c <= 'z'; c++ { id++ jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)} } close(jobs) } 

When you finish (there are no more jobs), the jobs channel is closed, which signals consumers that there will no longer be jobs.

Please note that produce() sees the jobs channel only as sending, because what the manufacturer should do only with this: send jobs to it (in addition to closing, but this is also allowed only on the sending channel), An accidental receipt in the producer will be an error compilation time (detected at the beginning, during compilation).

The responsibility of the consumer is to receive tasks until tasks can be received and complete them:

 func consume(id int, jobs <-chan *Job, results chan<- *Job) { defer wg.Done() for job := range jobs { sleepMs := rand.Intn(1000) fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs) time.Sleep(time.Duration(sleepMs) * time.Millisecond) job.Result = job.Work + fmt.Sprintf("-%dms", sleepMs) results <- job } } 

Note that consume() only sees the jobs channel for receive only; the consumer only needs to receive from him. Similarly, the results channel is sent only to the consumer.

Also note that the results channel cannot be closed here, since there are several consumer goroutines, and only the first attempt to close it will be successful, and further ones will lead to panic over time! results channel can (should) be closed after all consumer goroutines have ended, because then we can be sure that further values ​​(results) will not be sent through the results channel.

We have results that need to be analyzed:

 func analyze(results <-chan *Job) { defer wg2.Done() for job := range results { fmt.Printf("result: %s\n", job.Result) } } 

As you can see, this also gets the results until they can come (until the results channel is closed). The results channel for the analyzer is accepted only.

Pay attention to the use of channel types: whenever it is enough, use only the type of a unidirectional channel to detect and prevent errors at an early stage during compilation. Use only the bidirectional channel type if you need both directions.

And this is how it is glued together:

 func main() { jobs := make(chan *Job, 100) // Buffered channel results := make(chan *Job, 100) // Buffered channel // Start consumers: for i := 0; i < 5; i++ { // 5 consumers wg.Add(1) go consume(i, jobs, results) } // Start producing go produce(jobs) // Start analyzing: wg2.Add(1) go analyze(results) wg.Wait() // Wait all consumers to finish processing jobs // All jobs are processed, no more values will be sent on results: close(results) wg2.Wait() // Wait analyzer to analyze all results } 

Output Example:

Here is an example output:

As you can see, the results come and are analyzed before all tasks are queued:

 worker #4 received: 'e', sleep 81ms worker #0 received: 'a', sleep 887ms worker #1 received: 'b', sleep 847ms worker #2 received: 'c', sleep 59ms worker #3 received: 'd', sleep 81ms worker #2 received: 'f', sleep 318ms result: c-59ms worker #4 received: 'g', sleep 425ms result: e-81ms worker #3 received: 'h', sleep 540ms result: d-81ms worker #2 received: 'i', sleep 456ms result: f-318ms worker #4 received: 'j', sleep 300ms result: g-425ms worker #3 received: 'k', sleep 694ms result: h-540ms worker #4 received: 'l', sleep 511ms result: j-300ms worker #2 received: 'm', sleep 162ms result: i-456ms worker #1 received: 'n', sleep 89ms result: b-847ms worker #0 received: 'o', sleep 728ms result: a-887ms worker #1 received: 'p', sleep 274ms result: n-89ms worker #2 received: 'q', sleep 211ms result: m-162ms worker #2 received: 'r', sleep 445ms result: q-211ms worker #1 received: 's', sleep 237ms result: p-274ms worker #3 received: 't', sleep 106ms result: k-694ms worker #4 received: 'u', sleep 495ms result: l-511ms worker #3 received: 'v', sleep 466ms result: t-106ms worker #1 received: 'w', sleep 528ms result: s-237ms worker #0 received: 'x', sleep 258ms result: o-728ms worker #2 received: 'y', sleep 47ms result: r-445ms worker #2 received: 'z', sleep 947ms result: y-47ms result: u-495ms result: x-258ms result: v-466ms result: w-528ms result: z-947ms 

Try the full app on Go Playground .

No results channel

The code is greatly simplified if we do not use the results channel, but goroutines users process the result immediately (print it in our case). In this case, we do not need the values ​​2 sync.WaitGroup (the second only needed to wait for the analyzer to complete).

Without the results channel, the complete solution looks like this:

 var wg sync.WaitGroup type Job struct { Id int Work string } func produce(jobs chan<- *Job) { // Generate jobs: id := 0 for c := 'a'; c <= 'z'; c++ { id++ jobs <- &Job{Id: id, Work: fmt.Sprintf("%c", c)} } close(jobs) } func consume(id int, jobs <-chan *Job) { defer wg.Done() for job := range jobs { sleepMs := rand.Intn(1000) fmt.Printf("worker #%d received: '%s', sleep %dms\n", id, job.Work, sleepMs) time.Sleep(time.Duration(sleepMs) * time.Millisecond) fmt.Printf("result: %s\n", job.Work+fmt.Sprintf("-%dms", sleepMs)) } } func main() { jobs := make(chan *Job, 100) // Buffered channel // Start consumers: for i := 0; i < 5; i++ { // 5 consumers wg.Add(1) go consume(i, jobs) } // Start producing go produce(jobs) wg.Wait() // Wait all consumers to finish processing jobs } 

I like the output with the results channel (but, of course, the execution / completion order is random).

Try this option on the Go Playground .

+10
source

You can implement a counting semaphore to limit goroutine concurrency.

 var tokens = make(chan struct{}, 20) func worker(id string, work string, o chan string, wg *sync.WaitGroup) { defer wg.Done() tokens <- struct{}{} // acquire a token before performing work sleepMs := rand.Intn(1000) fmt.Printf("worker '%s' received: '%s', sleep %dms\n", id, work, sleepMs) time.Sleep(time.Duration(sleepMs) * time.Millisecond) <-tokens // release the token o <- work + fmt.Sprintf("-%dms", sleepMs) } 

This is a general design used to limit the number of workers. You can, of course, change the location of the issue / receipt of tokens according to your code.

+1
source

Source: https://habr.com/ru/post/1265593/


All Articles