Hi, I’m new to concurrency in go
My question is specifically around these lines of code:
wg.Wait()
close(results)
The goroutines that the wait group are waiting on range over a channel, which will block indefinitely unless closed. The close statement is happening after the wait so the channel never closes and the workers just block. In almost every example online I can find explaining the concept, the workers know when to stop, as in, its loop conditional is not a range
.
The thinking for go do(work)
is I don’t care how long they take or when they finish just that I get results, some might retry, some take variable time so I’m not using a wait group for them (this could be a mistake and I just don’t know it). go do(work)
will complete quicker than it takes to process the results in the worker, the more work, the longer the workers take.
The workers should have parallel access to whatever work is done, and process it in some way. This works with expected results when I add a time.Sleep
to the blocking part of the main function however adding wg.Wait()
results in a panic. I’m assuming because defer wg.Done()
is never decrements, and the workers hang on due to their range
.
Is there a small tweak I can make to my code to make this work or do I have to rethink the problem?
I’ve tried various other ways to signal that the work being done is complete and that it’s safe to close the channel using select
and varous flow control statements and helper channels but I can’t quite seem to get it to work exactly how I’d like. Can anyone steer me in the right direction?
package main
import (
"fmt"
"sync"
"time"
)
// some complicated work that takes ~500ms
func do(num int, ch chan<- int) {
// multiple results are passed to the channel in the real code
time.Sleep(time.Duration(500 * time.Millisecond))
/*
real work here is done to produce a new value to put in the channel
some times producing 2 or 3 values which are added to ch in this call
I've not properly illustrated that with num = num*num but number comes back
different, and some times multiple entires are added
*/
num = num * num
ch <- num
}
func main() {
results := make(chan int)
// some number of required complicated work
for i := 0; i < 53; i++ {
go do(i, results)
}
var wg sync.WaitGroup
// start 3 workers which can process results
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
worker(id, results)
}(i)
}
// handle closing the channel when all workers complete
// panic
wg.Wait()
close(results)
// works
//time.Sleep(time.Duration(10 * time.Second))
fmt.Println("donezo")
}
// process the results of do() in a meaningful way
func worker(id int, ch <-chan int) {
fmt.Println("starting worker", id)
for i := range ch {
fmt.Println("channel val:", i)
}
}