Worker pools - wait until all gorutines finish jobs

Hey guys! I have written my first app on golang - and all works fine except one - I use worker pools to run tasks in parallel, and I have noticed that not all gorutines waiting to be executed. I used this example:

func (provider *GoogleDriveProvider) FetchVideos() {
resp, _ := provider.SheetService.Spreadsheets.Values.Get(provider.SpreadsheetId, provider.SpreadsheetRange).Do()

  jobs := make(chan *googleSheetsVideo, googleWorkersAmount)
  for i := 0; i < googleWorkersAmount; i++ {
       	go provider.executeWork(jobs)
  }
  for _, row := range resp.Values {
	  if len(row) <= minRowLengthForSheets {
		continue
	  }

	  dataSets := provider.getDataSets(row)
	  for _, sheetsVideo := range dataSets {
		  jobs <- sheetsVideo
	  }

  }
  close(jobs)
  provider.Logger.Info("Obtaining videos from google Drive is finished, good job!")
}

func (provider *GoogleDriveProvider) executeWork(jobs <-chan *googleSheetsVideo) {
    for sheetsVideo := range jobs {
	  googleVideoFile, _ := sheetsVideo.fetchVideoData()
	  provider.buildVideo(googleVideoFile, sheetsVideo)
	  provider.Logger.Info("Video saved")
    }
} 

So I expected - the last message in the console will be - “Obtaining videos from google Drive is finished, good job!”, but instead it I see
"Obtaining videos from google Drive is finished, good job!
“Video saved”
“Video saved”
I have tried little bit rewrite code, and followed all steps from the above link (added result channel):

func (provider *GoogleDriveProvider) FetchVideos() {
  resp, _ := provider.SheetService.Spreadsheets.Values.Get(provider.SpreadsheetId, provider.SpreadsheetRange).Do()
  jobs := make(chan *googleSheetsVideo, googleWorkersAmount)
  results := make(chan int, googleWorkersAmount)
  for i := 0; i < googleWorkersAmount; i++ {
	  go provider.executeWork(jobs, results)
  }
  counter:=0
  for _, row := range resp.Values {
	  if len(row) <= minRowLengthForSheets {
	  	continue
	  }
	  counter++
	  dataSets := provider.getDataSets(row)
	  for _, sheetsVideo := range dataSets {
		jobs <- sheetsVideo
	  }

  }
  provider.Logger.WithFields(logFields).Info("DEBUGGGGGGG")
  for a := 0; a < counter; a++ {
	  <-results
  }
  close(jobs)
  provider.Logger.Info("Obtaining videos from google Drive is finished, good job!")
}

func (provider *GoogleDriveProvider) executeWork(jobs <-chan *googleSheetsVideo, results chan<- int) {
for sheetsVideo := range jobs {
	googleVideoFile, _ := sheetsVideo.fetchVideoData()
	video, _ := provider.buildVideo(googleVideoFile, sheetsVideo)
	results <- 1
	provider.Logger.WithFields(logrus.Fields{
		"fileName": video.FileName,
	}).Info("video saved")
 }
}

In this case, my program is hung, I don`t see any errors, or even my debug message “DEBUGGGGGGG”. Could somebody explain what is wrong?
P.S I know about waitgroup, but I want to figure out how works worker pools

1 Like

Hi ,

In your first example, range in separate go routine will be executed till the data is there and then come out of the loop but main go routine is finished writing to the channel before reading from the channel,so it continue with execution.
I tried second example , it throws me error with go-routines are asleep (after adding another channel like result ). I will try to look into more on second one and let you know.
Kindly correct me if I understood something wrong.

Hello! Thank you for your response. @Gowtham_Girithar What I expected is to see the last message with the text “Obtaining videos from google Drive is finished, good job!” while in the first case I see after it several messages with the text “video saved”.
Giving it into account, I have concluded that main goroutine doesn’t wait to finish all workers

1 Like

Your first code is correct. What is missing is synchronization. The main routine that prints "Obtaining videos from google Drive is finished, good job!" should wait that all go routine have terminated after closing the channel.

Here is how you would use it:

// synchronization variable
var wg sync.WaitGroup

func (provider *GoogleDriveProvider) FetchVideos() {
    resp, _ := provider.SheetService.Spreadsheets.Values.Get(provider.SpreadsheetId, provider.SpreadsheetRange).Do()
    wg.Add(googleWorkersAmount)
    jobs := make(chan *googleSheetsVideo, googleWorkersAmount)
    for i := 0; i < googleWorkersAmount; i++ {
        go provider.executeWork(jobs)
    }
    for _, row := range resp.Values {
        if len(row) <= minRowLengthForSheets {
	    continue
        }

        dataSets := provider.getDataSets(row)
        for _, sheetsVideo := range dataSets {
            jobs <- sheetsVideo
        }
    }
    close(jobs)
    wg.Wait()
    provider.Logger.Info("Obtaining videos from google Drive is finished, good job!")
}

func (provider *GoogleDriveProvider) executeWork(jobs <-chan *googleSheetsVideo) {
    for sheetsVideo := range jobs {
        googleVideoFile, _ := sheetsVideo.fetchVideoData()
        provider.buildVideo(googleVideoFile, sheetsVideo)
        provider.Logger.Info("Video saved")
    }
    wg.Done()
} 

I added the wg variable and the calls to it’s methods.

A WaitGroup is a counter that is incremented with Add() and decremented with Done(), You initially add the number of go routines that you’ll start so that the counter is the number or go routines. The method Wait()blocks until the counter has reached 0. It is decremented by the calls to Done() by each go routine when it has terminated its job.

The Wait()call must be performed after the close(jobs)because closing the channel notifies the go routines that they can terminate. It is performed before the final print message which will be executed when all go routines are terminated.

I thought, that channels are another way that allows to waint untill all gorutines are finished, without wait group

Channels are good to broadcast a signal, typically a termination signal. It would be a main go routine notifying a set of other go routine that they should stop. This is done with a channel that is closed and becomes non-blocking.

Here we have the main go routine that has to wait that all other go routine has terminated. This use case is typically for the WaitGroup.

You could implement something equivalent with channels, but it wouldn’t be as straightforward. You would for instance create one channel per worker go routine and the main go routine would store them in a slice. When a go routine terminates, it would close the channel to signal its termination. The main go routine would iterate over all these channels in the slice and read from them. Once a channel is closed, the read would be unblocked and it could proceed with the next channel. When it reaches the end of the slice, all channels have been closed.

This works too, but the WaitGroup counter is simpler than a slice of channels.

Thank for explanation!!

Don’t forget to flag the question as solved/answered please.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.