I need to implement the observer pattern with grpc.
There is a server that broadcast messages to clients
and then the clients return with an acknowledge.
Using a bidirectional stream:
rpc CommandStream(stream AcknowMessage) returns (stream CommandMessage);
Where the server sends CommandMessage to clients.
And the clients reply with AcknowMessage.
The server have a function to broadcast message when required,
also the function track aknow messages or timeout and return
a map of result like map[clientId]status.
So ask ChatGpt and get the next code, the program work well.
Even if an akwnoledge signal arrive after the timeout it doesn’t taked in count.
The code:
type client struct {
id string
stream pb.CommandService_CommandStreamServer
ackCh chan *pb.Acknowledgement
}
type server struct {
pb.UnimplementedCommandServiceServer
mu sync.Mutex
clients map[string]*client
}
func newServer() *server {
return &server{
clients: make(map[string]*client),
}
}
//Wen client start, it send CommandStream to suscribe
func (s *server) CommandStream(stream pb.CommandService_CommandStreamServer) error {
p, _ := peer.FromContext(stream.Context())
// del IP
clientID := p.Addr.String() //
//clientID := fmt.Sprintf("client-%d", time.Now().UnixNano())
c := &client{
id: clientID,
stream: stream,
ackCh: make(chan *pb.Acknowledgement),
}
s.mu.Lock()
s.clients[clientID] = c
s.mu.Unlock()
log.Printf("Client connected: %s", clientID)
defer func() {
s.mu.Lock()
delete(s.clients, clientID)
s.mu.Unlock()
log.Printf("Client disconnected: %s", clientID)
}()
// Load the ack response in the client channel
for {
in, err := stream.Recv() //Wait for answer
if err != nil {
return err
}
if ack := in.GetAck(); ack != nil {
//log.Printf("Received ACK from %s: %+v", clientID, ack)
log.Printf("Receive cmd: %s", ack.CommandId)
c.ackCh <- ack
}
}
}
func (s *server) BroadcastCommand(name, jsonArgs string) map[string]string {
s.mu.Lock()
defer s.mu.Unlock()
commandID := fmt.Sprintf("cmd-%d", time.Now().UnixNano())
log.Printf("Broadcast command: %v", commandID)
cmd := &pb.CommandMessage{
Payload: &pb.CommandMessage_Command{
Command: &pb.Command{
Id: commandID,
Name: name,
JsonArgs: jsonArgs,
},
},
}
results := make(map[string]string)
var wg sync.WaitGroup
mu := sync.Mutex{} // Protect results map
for _, c := range s.clients {
wg.Add(1)
go func(cl *client) {
defer wg.Done()
// Send command
if err := c.stream.Send(cmd); err != nil {
log.Printf("Error sending to %s: %v", c.id, err)
mu.Lock()
results[cl.id] = "send_failed"
mu.Unlock()
return
}
// Wait for ack or timeout
ackCh := make(chan string, 1)
begin := time.Now().UnixNano()
fmt.Printf(" Wait for ack or timeout \n")
reads := 0
go func() {
for {
select {
case ack := <-c.ackCh:
fmt.Printf(" read channel after: %d \n", time.Now().UnixNano()-begin)
fmt.Printf(" cmdId: %s \n", ack.CommandId)
reads++
ackCh <- ack.Status
case <-time.After(500 * time.Millisecond):
fmt.Printf(" timeout after: %d \n", time.Now().UnixNano()-begin)
ackCh <- "timeout"
}
}
}()
//fmt.Printf("many reads: %d \n", reads) // Always print 0, Why ??
status := <-ackCh
mu.Lock()
results[cl.id] = status
mu.Unlock()
}(c)
}
wg.Wait()
return results
}
But, inside the broadcast function there is a piece of code that I can’t understand:
go func() {
for {
reads++
select {
case ack := <-c.ackCh:
ackCh <- ack.Status
case <-time.After(500 * time.Millisecond):
ackCh <- "timeout"
}
}
}()
fmt.Printf("many reads: %d \n", reads) // Always print 0, Why ??
Why the read counter always is 0 ?
How the flow escape for the infinite loop without a return ?
Based on my channel knowledge a return is missing, but the code work well.
And if I add a return, the for loop is executed just one time, and sometimes
there are previous ack messages that has arrived after timeout,
Because previous command that has arrived after timeout are wrote to a channel, and the write operation is blocked until a channel read.
The sending goroutine blocks until another goroutine is ready to receive from that channel.
I always need to discard late arrival msg and get just the last one.
Is There a better way to write this?