Closing a ranged over channel inside worker goroutines

Hi, I’m new to concurrency in go

My question is specifically around these lines of code:

	wg.Wait()
	close(results)

The goroutines that the wait group are waiting on range over a channel, which will block indefinitely unless closed. The close statement is happening after the wait so the channel never closes and the workers just block. In almost every example online I can find explaining the concept, the workers know when to stop, as in, its loop conditional is not a range.

The thinking for go do(work) is I don’t care how long they take or when they finish just that I get results, some might retry, some take variable time so I’m not using a wait group for them (this could be a mistake and I just don’t know it). go do(work) will complete quicker than it takes to process the results in the worker, the more work, the longer the workers take.

The workers should have parallel access to whatever work is done, and process it in some way. This works with expected results when I add a time.Sleep to the blocking part of the main function however adding wg.Wait() results in a panic. I’m assuming because defer wg.Done() is never decrements, and the workers hang on due to their range.

Is there a small tweak I can make to my code to make this work or do I have to rethink the problem?

I’ve tried various other ways to signal that the work being done is complete and that it’s safe to close the channel using select and varous flow control statements and helper channels but I can’t quite seem to get it to work exactly how I’d like. Can anyone steer me in the right direction?

package main

import (
	"fmt"
	"sync"
	"time"
)

// some complicated work that takes ~500ms
func do(num int, ch chan<- int) {
	// multiple results are passed to the channel in the real code
	time.Sleep(time.Duration(500 * time.Millisecond))

	/*
		real work here is done to produce a new value to put in the channel
                some times producing 2 or 3 values which are added to ch in this call
                I've not properly illustrated that with num = num*num but number comes back
                different, and some times multiple entires are added
	*/
        num = num * num
	ch <- num
}

func main() {

	results := make(chan int)

	// some number of required complicated work
	for i := 0; i < 53; i++ {
		go do(i, results)
	}

	var wg sync.WaitGroup

	// start 3 workers which can process results
	for i := 0; i < 3; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			worker(id, results)
		}(i)
	}

	// handle closing the channel when all workers complete

        // panic
	wg.Wait()
	close(results)

	// works
	//time.Sleep(time.Duration(10 * time.Second))

	fmt.Println("donezo")
}

// process the results of do() in a meaningful way
func worker(id int, ch <-chan int) {
	fmt.Println("starting worker", id)

	for i := range ch {
		fmt.Println("channel val:", i)
	}
}

Hi @b0bu, the theory says that you should close the channel in the sender (the “do” function in your scenario).
The close function only sends a message to the channel and your workers will finish the range loop.

Regards!

1 Like

I oringally asked “Is there a small tweak I can make to my code to make this work or do I have to rethink the problem?” The answer I’ve discovered is, yes. Please correct any misconceptions I may have made in my clunky explaination.

I had to learn an interesting fundamental concept about channels: You can read from a closed channel. As mentioned the range in my original example never terminates because I couldn’t find a good place to close the channel, and even when I forced it some other way the program was exhibiting undesired behaviour i.e. exiting short of processing all the contents within a channel, deadlocks or send on closed channel. This is because of the nuances of the “real” code where processing the contents of a channel takes longer than it took to fill the channel which was true in my case, things were just out of sync.

Since there’s no clear practical way within my sender to close the channel it’s acceptable to do it in main, so I have via a goroutine. I’ve then wrapped the worker in it’s own wait group and I use that to block the progam for exiting, allowing the works to “finish”. By finish I mean when there is no more data to send, the channel is closed but still readable, this gives the range loop a terminating case as it’ll end when the default value of the channel is returned. The way I think about this is, there’s no way to know the length of an unbuffered channel as it’s being ranged over and perhaps written to, if you don’t know how many values you’re going to push to the channel, which is true in my case, until you close it, since closing it says read whatever is left until the terminating value ends the range as long as you’re “waiting” for that to happen.

Here’s the working example Go Playground - The Go Programming Language

I don’t exactly understand what you mean about reading from a closed channel, but doing this:

	go func() {
		wg.Wait()
		close(results)
	}()

In your code seems like a fine way to handle closing the results channel :slight_smile:

Thanks for the feedback. Yea the “closing” part is what I’m still trying to get my head around. I meant, closing a channel that’s ranged over, in the rendezvous pattern doesn’t kill the range if there’s more values to read. The channel is drained until a sort of EOF or default end of channel value is returned. This is easier to visualize when you buffer the channel.

output

filling 0
filling 1
filling 2
filling 3
filling 4
filling 5
filling 6
filling 7
filling 8
filling 9
closed
empyting 0
empyting 1
empyting 2
empyting 3
empyting 4
empyting 5
empyting 6
empyting 7
empyting 8
empyting 9

Maybe you can use two channels, one for data and another for control. Then read from the channels in an infinite loop. break if you read the appropriate message the control channel or if the data channel is closed.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.