How to break an infinite loop that is stuck at an instruction and doesn't execute further for unknown time

Hi all
in following code my loop is stuck at c.ReadMessage line for an indefinite time and it will go further if any message is returned but that time is not known let say termination signal comes but loop is stuck at that c.readMessage for indefinite time then how should i terminate the loop which is not moving further

for {
		select {
		case <-sign:
                    break
		default:
			//my loop is stuck at line below this comment
                       // and sign term gets signal to terminate
			msg, _ := c.ReadMessage(-1)
                        go processMessage(msg)

	}

Move your code which reads the message to the select case and add timeout

Something like this:

package main

import (
	"fmt"
	"time"
)

func main() {
	sign := make(chan bool)
	messages := make(chan int)
	go func() {
		for i := 0; i < 3; i++ {
			if i == 2 {
				time.Sleep(10)
			}
			messages <- i
		}
	}()

	for {
		select {
		case <-sign:
			break
		case msg, _ := <-messages:
			fmt.Println(msg)
			// go processMessage()
		case <-time.After(time.Second * 5):
			fmt.Println("timeouted")
			return
		}
	}
}

https://play.golang.org/p/8aYWKaxc15-

1 Like

can’t add time.after bcz i am getting data from server that can return data at any time

Could you provide the code of the c.ReadMessage() function ?

If you want to make this function interruptible by sign you could add a context with cancel. You then group ReadMessage and processMessage in a for loop in a go routine that returns when the context Done channel is closed.

Another possibility, is to modify the ReadMessage function so that it contains a select that returns when sign is closed.

If you can’t modify the implementation of ReadMessage, you are doomed.

this is long function of kafka library

func (c *Consumer) ReadMessage(timeout time.Duration) (*Message, error) {

	var absTimeout time.Time
	var timeoutMs int

	if timeout > 0 {
		absTimeout = time.Now().Add(timeout)
		timeoutMs = (int)(timeout.Seconds() * 1000.0)
	} else {
		timeoutMs = (int)(timeout)
	}
	for {
		ev := c.Poll(timeoutMs)
		switch e := ev.(type) {
		case *Message:
			if e.TopicPartition.Error != nil {
				return e, e.TopicPartition.Error
			}
			return e, nil
		case Error:
			return nil, e
		default:
			// Ignore other event types
		}

		if timeout > 0 {
			// Calculate remaining time
			timeoutMs = int(math.Max(0.0, absTimeout.Sub(time.Now()).Seconds()*1000.0))
		}

		if timeoutMs == 0 && ev == nil {
			//log.Println("Here is something",newError(C.RD_KAFKA_RESP_ERR__TIMED_OUT))
			return nil, newError(C.RD_KAFKA_RESP_ERR__TIMED_OUT)
		}
	}

}

actually when readMessage return and message processing being after that if task are done then these messages indexes are written on a channel so that we can commit their indexes so we can’t cancel the context if processing is started then we need to wait until its processing done but in my shutdown func i check the counter of processing tasks (wait group) and then i close channel but due to race condition processor is started after and its counter is checked before which leads me to close the channel and in process message try to write on channel which is already closed

This is tricky. So you want to wait for completion of all processingMessage go routines and being able to interrupt the ReadMessage.

Waiting for processingMessage completion should be done with a sync.WaitGroup that you increment before starting the processMessage go routine. In the go routine, you put a defer wg.Done() instruction at the start. When you wan’t to terminate and want to make sur all processMessage are done, you call wg.Wait().

Now, interrupting the ReadMessage with closing sign is not possible because there is no select in the function. It calls Poll which is itself a blocking function.

It looks like the solution is to use a timeout value different from -1. See the following code example. We specify a timeout value of 2 seconds, but it could be more, like 5 seconds. This will be the maximum time to wait for the sign closing to be taken in account.

for {
    select {
        case <-sign:
            break
        default:
            //my loop is stuck at line below this comment
            // and sign term gets signal to terminate
            msg, _ := c.ReadMessage(2*time.Second)
            if msg != nil {
                go processMessage(msg)
            }
    }
}

i am already using waitGroup to keep track of processed Messages. when sig term and response from readMessage comes at same time then shutdown func get pm.wait() as 0 messages are being processed but in other go routine that is handling c.readMessage reads message simultaneously and then it increments pm counter but in shutdown pm.wait instruction is checked earlier so shutdown closes channel and message processing try to write on channel which is already closed by shutdown and program panics

go func() {
		sig := <-gracefulStop
		log.Printf("[info ]caught sig: %v closing ", sig)
		e.actCount.Wait()
		close(e.syncChan)
                // we have closed this channel here 
               //but processMessage() tries to write 
              //on this chan which is closed
	}

in other func

for {
  msg, err := e.consumer.ReadMessage(-1)
		if err != nil {
                continue
		} else {
			e.actCount.Add(1)
                 // this adds one but shutdown 
                 //already got 0 at  e.actCount.Wait()
                 // and chan is closed by shutdown 
                 //now passed to processMessage
		go e.processMessage(msg,e.syncChan)
              }

}

and we can’t use time in ReadMessage()

There will always be a race condition if the Add and Wait are done in different go routines. You must try to do the Add and Wait in the same go routine.

Why can’t you use time in ReadMessage ? It looks possible, looking at the code you gave.

if i use time in readMessage then polling will be on our end and statically we will be polling the server but if we use -1 then library gives us messages as soon it gets and polls by itself if there are a lot of messages on server per sec then service will be sitting idle for static time and queue will have a lot of unprocessed messages

is there any way if my loop is stuck at readmessage and sign term is detected then how to break it ?

That’s not correct. The time we give as argument to ReadMessage is a timeout value, not the time to block until it returns. With a timeout of 2 or 5 seconds, ReadMessage will still return for every message as soon as it arrives. The only difference is that ReadMessage will return with an error and nil message if there are no messages for 2 or 5 seconds. That is when you check for termination signal. A 2 to 5 second waiting time is OK for humans trying to terminate the program.

The time we specify is not the period of polling. It’s the maximum waiting time for a message.

1 Like

There is no way to stop the ReadMessage call. The only thing possible is to specify a maximum waiting time forcing ReadMessage to return if there are no messages. This pattern is frequently used because it is simpler to define a timeout value than to make a function stoppable.

Please try the code I suggested and you will see it works.

1 Like

yes i have been working on that part that was great advice thanks @Christophe_Meessen

I just checked the kafka go client library and saw this method :
m, err := r.ReadMessage(context.Background())
do you use old version of the library?

1 Like

service i am working on is very time sensitive and thats why i wanted to avoid checking if condition again and again while no message is read

for {
  if somecondition {
   // do this continue
  }
   select{
   case <-signterm:
   //terminate
   default: 
         msg,err:= c.readMessage(2*time.miliseconds)
// rest of program...
    }
}

i am using confulentinc-kafak-go not segmentio’s

service i am working on is very time sensitive and thats why i wanted to avoid checking if condition again and again while no message is read

I don’t think it will reduce your performance at all. But anyway I see the library you are using needs a new reader implementation which must take context. Fork it and implement it.

1 Like

thanks i will see