Bool variable getting set outside scope

I worked on this issue for many hours before finding a fix, although I don’t understand why this was needed.

I have a function-level variable, doRealtime bool in a function called RealTimeQuotes(). This is set to true and determines whether the function should continue running. Inside the loop, data is retrieved using a web socket. The data is added to a buffered chan with a buffer size of 30. When the data (up to 14 records currently) has been added to the chan and the wait group added for each record, there is a wg.wait(). wg is a modular-level variable (not sure what the proper terminology is for that). It is declared outside of the functions.

I have another function, SseHandlerRealTime(w http.ResponseWriter, request *http.Request), which is an HTTP handler called from the browser every 5 seconds or so using server-side events.

Here is the basic layout of the relevant code:

var wg sync.WaitGroup
var channelStocks chan *data.StockRealTime

func SseHandlerRealTime(w http.ResponseWriter, request *http.Request) {
	stocksToSend := make([]*data.StockRealTime, 0)

	for stock := range channelStocks {
		// add to array (stocksToSend) to send to browser
	}
	channelStocks = make(chan *data.StockRealTime, channelBufferSize)
	
	jsonStock, _ := json.Marshal(stocksToSend)
	stringJSONStock := string(jsonStock)
	fmt.Fprintf(w, "data: %v\n\n", stringJSONStock)
	
}

func (realTime RealTime) RealTimeQuotes(){
	var doRealTime bool. //later moved to modular-level to fix issue

	wg = sync.WaitGroup{}
	doRealTime = true
	channelStocks = make(chan *data.StockRealTime, channelBufferSize)
	
	go func() {
		<-realTimeExecutionChannel
		doRealTime = false
		channelStocks = make(chan *data.StockRealTime, channelBufferSize)
		realTimeExecutionChannel <- struct{}{}
	}()

	for doRealTime{
		// get latest ticks from websocket-based function
		for tickKey, tick := range newTicksMap {
			var stockRealTime data.StockRealTime
			// populate stockRealTime 
			wg.Add(1)
			channelStocks <- &stockRealTime
			
		}
		close(channelStocks)
		wg.Wait()
	}
}

The go routine allows me to cancel from the browser. This code works fine on the first run. If I cancel it. Then run it again(not restarting the web server), when it hits the wg.wait() the second time for this run, somehow doRealTime becomes false, causing the function to exit. I added extensive logging to try to fix this issue. I finally got it to work correctly by changing the scope of doRealTime to modular-level, even though it is not accessed anywhere except in RealTimeQuotes(). This fixes the problem but I don’t understand why.

Hi @rschluet
Sorry, but I didn’t get exactly the problem.
Just looking the code I realize that you are initializating the “wg” package variable each time you call the “RealTimeQuotes” function. Doesn’t make sense to me. Maybe is related with the issue?

That function is called only once. It then has a loop which continues to retrieve stock prices and putting them in the buffered channel, up to 20 or so. It then waits until the channel is empty then retrieves the next batch, and so on. The browser page has a cancel button which tells the loop to quit by setting doRealTime to false. This works as it should; however, if I run it again, when it hits the wg.wait() doRealTime gets set to false somehow, even though it hits no code to do this. Making doRealTime modular-level solves the problem, but I don’t see why.

This is a guess, but I can imagine something like this happening

  1. Code has been running for a bit; it’s currently here:
    // get latest ticks from websocket-based function

    • Whatever this code does is waiting to get the next batch of “ticks.”
    • SseHandlerRealTime is waiting on the current channelStocks channel value
  2. realTimeExecutionChannel is closed by clicking cancel.

  3. <-realTimeExecutionChannel executes:

    • Sets doRealTime = false
    • assigns a new channel to channelStocks (Note that SseHandlerRealTime is still waiting to retrieve from the old channelStocks channel value)
  4. // get latest ticks from websocket-based function completes

    • for tickKey, tick := range newTicksMap { iterates over the map and loads values into the new channelStocks channel,
    • closes the new channelStocks channel
    • Waits for SseHandlerRealTime to finish, but it never will because it’s waiting on the old channelStocks channel value.

This doesn’t really explain how moving doRealTime out to a package-level variable fixes it, however I can say that writing to and reading from the doRealTime and channelStocks variables from two or more goroutines is a data race and that you don’t get any guarantees about the relative ordering of those two operations if you’re not doing any synchronization. A potential fix could be to instead use atomics:

  • Switching doRealTime to an int32 = 1
  • Changing your for condition from just for doRealTime to for atomic.LoadInt32(&doRealTime) == 1
  • Change doRealTime = false to atomic.StoreInt32(&doRealTime, 0)

Or a channel:

doRealTime := make(chan struct{})

// ...

go func() {
	<-realTimeExecutionChannel
	close(doRealTime)
	channelStocks = make(chan *data.StockRealTime, channelBufferSize)
	realTimeExecutionChannel <- struct{}{}
}()

for {
	if _, _, closed := tryRecv(done); closed {
		break
	}

	// the rest of the loop
}

func tryRecv[T any](ch chan T) (v T, received, closed bool) {
	select {
	case v, received = <-ch:
		closed = !received
	default:
	}
	return
}

Can you use a single channel and just break out of it from SseHandlerRealTime if you have no more data?

var channelStocksFeed = make(chan []*data.StockRealTime, 30)

func SseHandlerRealTime(w http.ResponseWriter, request *http.Request) {
	stocksToSend := make([]*data.StockRealTime, 0)

	for {
		channelStocks, received, closed := tryRecv(channelStocksFeed)
		if !received || closed {
			// No (more) stocks for now; break to send what we have
			// (or an empty slice if nothing for now).
			break
		}
		for _, channelStock := range channelStocks {
			// add to array (stocksToSend) to send to browser
		}
	}

	jsonStock, _ := json.Marshal(stocksToSend)
	stringJSONStock := string(jsonStock)
	fmt.Fprintf(w, "data: %v\n\n", stringJSONStock)
}

func (realTime RealTime) RealTimeQuotes(){
	for {
		if _, received, closed := tryRecv(realTimeExecutionChannel); received || closed {
			break
		}

		// get latest ticks from websocket-based function

		channelStocks := make([]*data.StockRealTime, 0, len(newTicksMap))
		for tickKey, tick := range newTicksMap {
			var stockRealTime data.StockRealTime
			// populate stockRealTime 
			wg.Add(1)
			channelStocks = append(channelStocks, &stockRealTime)
		}

		channelStocksFeed <- channelStocks
	}
}

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.