Channels, When the program flow exit from the infinite for { select }

I need to implement the observer pattern with grpc.
There is a server that broadcast messages to clients
and then the clients return with an acknowledge.

Using a bidirectional stream:
rpc CommandStream(stream AcknowMessage) returns (stream CommandMessage);

Where the server sends CommandMessage to clients.
And the clients reply with AcknowMessage.

The server have a function to broadcast message when required,
also the function track aknow messages or timeout and return
a map of result like map[clientId]status.

So ask ChatGpt and get the next code, the program work well.
Even if an akwnoledge signal arrive after the timeout it doesn’t taked in count.
The code:


type client struct {
	id     string
	stream pb.CommandService_CommandStreamServer
	ackCh  chan *pb.Acknowledgement
}

type server struct {
	pb.UnimplementedCommandServiceServer
	mu      sync.Mutex
	clients map[string]*client
}

func newServer() *server {
	return &server{
		clients: make(map[string]*client),
	}
}
//Wen client start, it send CommandStream to suscribe
func (s *server) CommandStream(stream pb.CommandService_CommandStreamServer) error {
	p, _ := peer.FromContext(stream.Context())
	// del IP
	clientID := p.Addr.String() //
	//clientID := fmt.Sprintf("client-%d", time.Now().UnixNano())

	c := &client{
		id:     clientID,
		stream: stream,
		ackCh:  make(chan *pb.Acknowledgement),
	}

	s.mu.Lock()
	s.clients[clientID] = c
	s.mu.Unlock()
	log.Printf("Client connected: %s", clientID)

	defer func() {
		s.mu.Lock()
		delete(s.clients, clientID)
		s.mu.Unlock()
		log.Printf("Client disconnected: %s", clientID)
	}()

	// Load the ack response in the client channel
	for {
		in, err := stream.Recv() //Wait for answer
		if err != nil {
			return err
		}
		if ack := in.GetAck(); ack != nil {
			//log.Printf("Received ACK from %s: %+v", clientID, ack)
			log.Printf("Receive cmd: %s", ack.CommandId)
			c.ackCh <- ack
		}
	}
}


func (s *server) BroadcastCommand(name, jsonArgs string) map[string]string {
	s.mu.Lock()
	defer s.mu.Unlock()

	commandID := fmt.Sprintf("cmd-%d", time.Now().UnixNano())
	log.Printf("Broadcast command: %v", commandID)

	cmd := &pb.CommandMessage{
		Payload: &pb.CommandMessage_Command{
			Command: &pb.Command{
				Id:       commandID,
				Name:     name,
				JsonArgs: jsonArgs,
			},
		},
	}

	results := make(map[string]string)
	var wg sync.WaitGroup
	mu := sync.Mutex{} // Protect results map

	for _, c := range s.clients {
		wg.Add(1)

		go func(cl *client) {
			defer wg.Done()

			// Send command
			if err := c.stream.Send(cmd); err != nil {
				log.Printf("Error sending to %s: %v", c.id, err)
				mu.Lock()
				results[cl.id] = "send_failed"
				mu.Unlock()
				return
			}

			// Wait for ack or timeout
			ackCh := make(chan string, 1)
			begin := time.Now().UnixNano()			
			fmt.Printf(" Wait for ack or timeout  \n")
			reads := 0
		
			go func() {

				for {

					select {
					case ack := <-c.ackCh:
						fmt.Printf("  read channel after: %d  \n", time.Now().UnixNano()-begin)
						fmt.Printf("               cmdId: %s  \n", ack.CommandId)
						reads++
						ackCh <- ack.Status

					case <-time.After(500 * time.Millisecond):
						fmt.Printf("  timeout after: %d  \n", time.Now().UnixNano()-begin)
						ackCh <- "timeout"

					}
				}

			}()

			//fmt.Printf("many reads: %d  \n", reads) // Always print 0, Why ??

			status := <-ackCh
			mu.Lock()
			results[cl.id] = status
			mu.Unlock()
		}(c)
	}

	wg.Wait()
	return results
}

But, inside the broadcast function there is a piece of code that I can’t understand:

			go func() {

				for {
					reads++
					select {
					case ack := <-c.ackCh:						
						ackCh <- ack.Status

					case <-time.After(500 * time.Millisecond):						
						ackCh <- "timeout"
					}
				}

			}()
         fmt.Printf("many reads: %d  \n", reads) // Always print 0, Why ??

Why the read counter always is 0 ?
How the flow escape for the infinite loop without a return ?

Based on my channel knowledge a return is missing, but the code work well.
And if I add a return, the for loop is executed just one time, and sometimes
there are previous ack messages that has arrived after timeout,
Because previous command that has arrived after timeout are wrote to a channel, and the write operation is blocked until a channel read.
The sending goroutine blocks until another goroutine is ready to receive from that channel.
I always need to discard late arrival msg and get just the last one.

Is There a better way to write this?

Take a look at this condensed version of the code you provided:

go func() {
    // loop sending values to channel
}()
fmt.Printf("many reads: %d  \n", reads)

The go keyword fires up a parallel goroutine (like a mini-thread) which runs the function specified, which in this case is your inline function that sends values to the channel. The fmt.Printf code is executed directly after the goroutine is created; it will not wait for the goroutine to do anything.

Thanks, that explain why reads is always = 0,
The next question is:

How the flow escape for the infinite loop without a return ?

In the code you provided, there is no escape from the infinite loop other than a panic. So the loop will run until it panics on a closed channel or the program stops. You probably want to put some kind of done-signaling logic in the code.

  1. So if I trigger one Broadcast by second, in one our I will have 3600 go func running and waiting for panic or program end ?
  2. How would you implement the done output? The for should receive all the messages and keep only the last one
  1. That’s right.
  2. Based on my understanding of your code, you don’t actually need a separate goroutine to wait for the ack. Here’s how I would improve the code:
		go func(cl *client) {
			// Send command...

			// Wait for ack or timeout
			begin := time.Now().UnixNano()			
			fmt.Printf(" Wait for ack or timeout  \n")
			var status string
			select {
			case ack := <-c.ackCh:
				fmt.Printf("  read channel after: %d  \n", time.Now().UnixNano()-begin)
				fmt.Printf("               cmdId: %s  \n", ack.CommandId)
				status = ack.Status
			case <-time.After(500 * time.Millisecond):
				fmt.Printf("  timeout after: %d  \n", time.Now().UnixNano()-begin)
				status = "timeout"
			}

			// Add the status to the results...
		}(c)
1 Like

Hey there. I’d say that the main issue and questions here goes from lack of understanding not just the channels, but goroutines in general. It can lead to more misunderstanding and misconceptions in the future. You can solve this by doing a range over a channel for example:

package main

import (
	"fmt"
	"time"
)

func main() {
	var i int

	ch := make(chan int)

	go func() {
		defer close(ch)

		for j := range 10 {
			ch <- j
			time.Sleep(time.Second)
		}
	}()

	for i = range ch {
	}

	fmt.Println(i)
}

In this case at the end, it has the last sent value from the channel. You can add empty struct channel to signalize the exit. You can do it around your send/receive channels. There are more questions than answers, since it’s hard to tell for sure without more code. E.g., where the c.ackCh is closing? What should “timeout” do? is it for logging?

Another topic which is really important is when and how you use buffered channels.