Job queue where workers can add jobs, is there an elegant solution to stop the program when all workers are idle?

Hello,

I find myself in a situation where I have a queue of jobs where workers can add new jobs when they are done processing one.

For illustration, in the code below, a job consists in counting up to JOB_COUNTING_TO and, randomly, 1/5 of the time a worker will add a new job to the queue.

Because my workers can add jobs to the queue, it is my understanding that I was not able to use a channel as my job queue. This is because sending to the channel is blocking and, even with a buffered channel, this code, due to its recursive nature (jobs adding jobs) could easily reach a situation where all the workers are sending to the channel and no worker is available to receive.

This is why I decided to use a shared queue protected by a mutex.

Now, I would like the program to halt when all the workers are idle. Unfortunately this cannot be spotted just by looking for when len(jobQueue) == 0 as the queue could be empty but some worker still doing their job and maybe adding a new job after that.

The solution I came up with is, I feel a bit clunky, it makes use of variables var idleWorkerCount int and var isIdle [NB_WORKERS]bool to keep track of idle workers and the code stops when idleWorkerCount == NB_WORKERS.

My question is if there is a concurrency pattern that I could use to make this logic more elegant?

Also, for some reason I don’t understand the technique that I currently use (code below) becomes really inefficient when the number of Workers becomes quite big (such as 300000 workers): for the same number of jobs, the code will be > 10x slower for NB_WORKERS = 300000 vs NB_WORKERS = 3000.

Thank you very much in advance!

package main

import (
	"math/rand"
	"sync"
)

const NB_WORKERS = 3000
const NB_INITIAL_JOBS = 300
const JOB_COUNTING_TO = 10000000

var jobQueue []int
var mu sync.Mutex
var idleWorkerCount int
var isIdle [NB_WORKERS]bool

func doJob(workerId int) {

	mu.Lock()

	if len(jobQueue) == 0 {
		if !isIdle[workerId] {
			idleWorkerCount += 1
		}
		isIdle[workerId] = true
		mu.Unlock()
		return
	}

	if isIdle[workerId] {
		idleWorkerCount -= 1
	}
	isIdle[workerId] = false

	var job int
	job, jobQueue = jobQueue[0], jobQueue[1:]
	mu.Unlock()

	for i := 0; i < job; i += 1 {
	}

	if rand.Intn(5) == 0 {
		mu.Lock()
		jobQueue = append(jobQueue, JOB_COUNTING_TO)
		mu.Unlock()
	}

}

func main() {

	// Filling up the queue with initial jobs
	for i := 0; i < NB_INITIAL_JOBS; i += 1 {
		jobQueue = append(jobQueue, JOB_COUNTING_TO)
	}

	var wg sync.WaitGroup
	for i := 0; i < NB_WORKERS; i += 1 {
		wg.Add(1)
		go func(workerId int) {
			for idleWorkerCount != NB_WORKERS {
				doJob(workerId)
			}
			wg.Done()
		}(i)
	}
	wg.Wait()
}

Hi @tcosmo,

I came across your question just now, is it still an issue 6 days later?

I have two questions about your initial assumptions and conclusions.

  1. Is there a reason why workers need to generate new jobs rather than just working on jobs they receive? In other words, can you achieve the business goal without allowing workers to feed new jobs to the queue?
  2. You say that buffered channels don’t work because if all workers are blocked trying to send new jobs to an already full channel, they cannot receive new jobs. This sounds as if workers run in a single goroutine. How about adding an extra goroutine to the worker code for submitting jobs back to the queue? This way, workers would stay available for receiving new jobs even when the sending goroutine blocks.