Parallel loop

I want for-loop to be parallel using go routines. I tried to use channels, but it did not work. My main problem is that I want to wait for all iterations to complete before continuing. That's why just writing go before it doesn't work. I tried to use channels (I think it wasn’t), but it made my code even slower

 func createPopulation(populationSize int, individualSize int) []Individual { population := make([]Individual, populationSize) //i want this loop to be work parallel for i := 0; i < len(population); i++ { population[i] = createIndividual(individualSize) } return population } func createIndividual(size int) Individual { var individual = Individual{make([]bool, size), 0} for i := 0; i < len(individual.gene); i++ { if rand.Intn(2)%2 == 1 { individual.gene[i] = true } else { individual.gene[i] = false } } return individual } 

My structure is as follows:

 type Individual struct { gene []bool fitness int } 
+6
source share
5 answers

So basically goroutine should not return a value, but click on a channel. If you want to wait until all goroutines are complete, you can simply count on the number of goroutines or use WaitGroup. In this example, this is excess because size is known, but it is good practice. Here is a modified example:

 package main import ( "fmt" "math/rand" "sync" ) type Individual struct { gene []bool fitness int } func createPopulation(populationSize int, individualSize int) []Individual { // we create a slice with a capacity of populationSize but 0 size // so we'll avoid extra unneeded allocations population := make([]Individual, 0, populationSize) // we create a buffered channel so writing to it won't block while we wait for the waitgroup to finish ch := make(chan Individual, populationSize) // we create a waitgroup - basically block until N tasks say they are done wg := sync.WaitGroup{} for i := 0; i < populationSize; i++ { //we add 1 to the wait group - each worker will decrease it back wg.Add(1) //now we spawn a goroutine go createIndividual(individualSize, ch, &wg) } // now we wait for everyone to finish - again, not a must. // you can just receive from the channel N times, and use a timeout or something for safety wg.Wait() // we need to close the channel or the following loop will get stuck close(ch) // we iterate over the closed channel and receive all data from it for individual := range ch { population = append(population, individual) } return population } func createIndividual(size int, ch chan Individual, wg *sync.WaitGroup) { var individual = Individual{make([]bool, size), 0} for i := 0; i < len(individual.gene); i++ { if rand.Intn(2)%2 == 1 { individual.gene[i] = true } else { individual.gene[i] = false } } // push the population object down the channel ch <- individual // let the wait group know we finished wg.Done() } 
+6
source

For your specific problem you do not need to use channels at all.

However, if your createIndividual spends some time doing calculations, the context switching between goroutines will always be much slower when running in parallel.

 type Individual struct { gene []bool fitness int } func createPopulation(populationSize int, individualSize int) (population []*Individual) { var wg sync.WaitGroup population = make([]*Individual, populationSize) wg.Add(populationSize) for i := 0; i < populationSize; i++ { go func(i int) { population[i] = createIndividual(individualSize) wg.Done() }(i) } wg.Wait() return } func createIndividual(size int) *Individual { individual := &Individual{make([]bool, size), 0} for i := 0; i < size; i++ { individual.gene[i] = rand.Intn(2)%2 == 1 } return individual } func main() { numcpu := flag.Int("cpu", runtime.NumCPU(), "") flag.Parse() runtime.GOMAXPROCS(*numcpu) pop := createPopulation(1e2, 21e3) fmt.Println(len(pop)) } 

Output:

 ┌─ oneofone@Oa [/tmp] └──➜ go build blah.go; xtime ./blah -cpu 1 100 0.13u 0.00s 0.13r 4556kB ./blah -cpu 1 ┌─ oneofone@Oa [/tmp] └──➜ go build blah.go; xtime ./blah -cpu 4 100 2.10u 0.12s 0.60r 4724kB ./blah -cpu 4 
+2
source

One common way to add managed parallelism to a loop like this is to create many working goroutines that will read jobs from the feed. The runtime.NumCPU function can help you decide how many workers it makes sense to run (make sure you install GOMAXPROCS to take advantage of these CPUs). Then you simply record tasks on the channel, and they will be processed by workers.

In this case, when the task should initialize the elements of the mid-slice, so using the *Individual pointer channel may make sense. Something like that:

 ch := make(chan *Individual) for i := 0; i < nworkers; i++ { go initIndividuals(individualSize, ch) } population := make([]Individual, populationSize) for i := 0; i < len(population); i++ { ch <- &population[i] } close(ch) 

A working goroutine will look something like this:

 func initIndividuals(size int, ch <-chan *Individual) { for individual := range ch { // Or alternatively inline the createIndividual() code here if it is the only call *individual = createIndividual(size) } } 

Since the tasks are not distributed ahead of schedule, it does not matter whether createIndividual takes a variable amount of time: each worker will only perform a new task when the latter is completed, and will exit when there are no tasks left (since the channel is closed at this point).

But how do we know when the work is completed? The sync.WaitGroup type can help here. The code for creating work cities can be changed as follows:

 ch := make(chan *Individual) var wg sync.WaitGroup wg.Add(nworkers) for i := 0; i < nworkers; i++ { go initIndividuals(individualSize, ch, &wg) } 

The initIndividuals function initIndividuals also modified to accept an additional parameter and adds defer wg.Done() as the first statement. Now the call to wg.Wait() will be blocked until all workstations are completed. Then you can return the fully constructed fragment of the population .

+1
source

Since you know in advance how many people you will have, I would refrain from using channels and simply assign individual population members to the goroutine createIndividual . The createIndividual signature will look like this:

 func createIndividual(wg *sync.WaitGroup, individual *Individual, size int) 

and the calling code will look like this:

 population := make([]Individual, populationSize) wg := &sync.WaitGroup{} wg.Add(len(population)) for i := 0; i < len(population); i++ { go createIndividual(wg, &population[i], individualSize) } wg.Wait() 

So, each go procedure is responsible for only one person, which he assigns to the corresponding slot in the population :

 func createIndividual(wg *sync.WaitGroup, individual *Individual, size int) { defer wg.Done() *individual = Individual{make([]bool, size), 0} // assign other attributes to `individual` } 

Here you can see the full code example here .

0
source

If you want to avoid mixing concurrency logic with business logic, I wrote this https://github.com/shomali11/parallelizer library to help you with this, It encapsulates the concurrency logic, so you don't need to worry about that.

So in your example:

 package main import ( "github.com/shomali11/parallelizer" "fmt" ) func main() { populationSize := 100 results = make([]*Individual, populationSize) options := &Options{ Timeout: time.Second } group := parallelizer.NewGroup(options) for i := 0; i < populationSize; i++ { group.Add(func(index int, results *[]*Individual) { return func () { ... results[index] = &Individual{...} } }(i, &results)) } err := group.Run() fmt.Println("Done") fmt.Println(fmt.Sprintf("Results: %v", results)) fmt.Printf("Error: %v", err) // nil if it completed, err if timed out } 
0
source

Source: https://habr.com/ru/post/970886/


All Articles