Channels to control Goroutines

Hi,

I have large number of jobs to process that can be executed in parallel, but I only want, say max ‘n’ jobs to run concurrently at any one time. Jobs could be added at any time. I was thinking of creating two functions, to increment and decrement a counter variable through a channel as a method of control, but in truth, I’m a bit lost here. Is there an established pattern for doing this sort of thing?

Thanks
Cam

Hi @tranman,

Welcome to the forum.

I would approach such a task in the following way.

First, I’d start a goroutine only if there is work to do, and the goroutine would finish running as soon as the work is done. This avoids having to manage a “worker pool” of goroutines.

Then I can use various ways of tracking and limiting the number of goroutines that run at the same time.

For example:

  1. Create a buffered channel of size n. Before starting a goroutine, try sending a value to that channel. If the channel is full, the start must wait for another goroutine to exit. When a goroutine is done, it reads one item out of that channel.

  2. Use a sync.Pool. Pre-fill it with n items. New goroutines can start as long as pool.Get() returns. Goroutines that are done call pool.Put().

  3. Or use a simple integer and manage concurrent access to it with a mutex. (To avoid data races.)

I don’t know if there is one specific pattern for this that can be considered established. The main point with any of these approaches is that the counting must be done in a thread-safe way.

@christophberger

Thanks for your reply. After a bit of thrashing about, I come up with the following. It seems to do the job :slight_smile:

Thanks
Cam


import (
	"log"
	"math/rand"
	"strconv"
	"sync"
	"time"
)

type Job struct {
	Mu             sync.Mutex
	GoRoutineCount int
}

var job Job

func (j *Job) runThread(ch chan int, done chan bool) {
	time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
	log.Printf("%-20s %s\n", "No of Active GoRoutines", strconv.Itoa(j.GoRoutineCount))
	<-ch
	j.Mu.Lock()
	j.GoRoutineCount--
	j.Mu.Unlock()
	if j.GoRoutineCount == 0 {
		done <- true
	}
}
func main() {
	chBuffered := make(chan int, 8)
	noOfJobs := 25
	chDone := make(chan bool)
	go func() {
		for i := 0; i < noOfJobs; i++ {
			job.Mu.Lock()
			job.GoRoutineCount++
			job.Mu.Unlock()
			chBuffered <- i
			go job.runThread(chBuffered, chDone)
		}
		close(chBuffered)
	}()
	<-chDone
}

Looks good, and runs well in the Playground!

Here is my take on it: Go Playground - The Go Programming Language

  • I chose the “buffered channel” method for setting the upper limit.
  • Before starting a new goroutine, I try writing to the channel. When the channel gets full, the write blocks until another goroutine that is about to end reads one value from the channel.
  • The channel item type is “empty struct” to signal the reader that no actual data is sent to the channel. You could use bool or int as well.
  • I use a WaitGroup to have the main goroutine wait for all others.

Truth is, I don’t like mutexes :slight_smile:
In more complex synchronization scenarios, it is too easy to get the timings wrong, resulting in races or deadlocks that can be hard to track down. Channels are a convenient abstraction over mutexes (in fact, channels use mutexes internally) and easier to reason about.

1 Like

@christophberger ha. very good, thanks

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.