Hello,
I find myself in a situation where I have a queue of jobs where workers can add new jobs when they are done processing one.
For illustration, in the code below, a job consists in counting up to JOB_COUNTING_TO
and, randomly, 1/5 of the time a worker will add a new job to the queue.
Because my workers can add jobs to the queue, it is my understanding that I was not able to use a channel as my job queue. This is because sending to the channel is blocking and, even with a buffered channel, this code, due to its recursive nature (jobs adding jobs) could easily reach a situation where all the workers are sending to the channel and no worker is available to receive.
This is why I decided to use a shared queue protected by a mutex.
Now, I would like the program to halt when all the workers are idle. Unfortunately this cannot be spotted just by looking for when len(jobQueue) == 0
as the queue could be empty but some worker still doing their job and maybe adding a new job after that.
The solution I came up with is, I feel a bit clunky, it makes use of variables var idleWorkerCount int
and var isIdle [NB_WORKERS]bool
to keep track of idle workers and the code stops when idleWorkerCount == NB_WORKERS
.
My question is if there is a concurrency pattern that I could use to make this logic more elegant?
Also, for some reason I don’t understand the technique that I currently use (code below) becomes really inefficient when the number of Workers becomes quite big (such as 300000 workers): for the same number of jobs, the code will be > 10x slower for NB_WORKERS = 300000
vs NB_WORKERS = 3000
.
Thank you very much in advance!
package main
import (
"math/rand"
"sync"
)
const NB_WORKERS = 3000
const NB_INITIAL_JOBS = 300
const JOB_COUNTING_TO = 10000000
var jobQueue []int
var mu sync.Mutex
var idleWorkerCount int
var isIdle [NB_WORKERS]bool
func doJob(workerId int) {
mu.Lock()
if len(jobQueue) == 0 {
if !isIdle[workerId] {
idleWorkerCount += 1
}
isIdle[workerId] = true
mu.Unlock()
return
}
if isIdle[workerId] {
idleWorkerCount -= 1
}
isIdle[workerId] = false
var job int
job, jobQueue = jobQueue[0], jobQueue[1:]
mu.Unlock()
for i := 0; i < job; i += 1 {
}
if rand.Intn(5) == 0 {
mu.Lock()
jobQueue = append(jobQueue, JOB_COUNTING_TO)
mu.Unlock()
}
}
func main() {
// Filling up the queue with initial jobs
for i := 0; i < NB_INITIAL_JOBS; i += 1 {
jobQueue = append(jobQueue, JOB_COUNTING_TO)
}
var wg sync.WaitGroup
for i := 0; i < NB_WORKERS; i += 1 {
wg.Add(1)
go func(workerId int) {
for idleWorkerCount != NB_WORKERS {
doJob(workerId)
}
wg.Done()
}(i)
}
wg.Wait()
}