Go routines with a flexible timeout

Consider the scenario where you have a server that handler requests via a request handler (refer to picture). The handler (via its workers) can take a long time, so you need a timeout just in case it never terminates. Requests that take a long time are acceptable in our case, as long as any one of the workers is progressing. However, the initial timeout that we set limits this. (I.e. is there a way to detect inactivity instead of a constant timeout?)

For instance, you have 6 workers working under a context containing a timeout of 5 seconds. Worker n takes n seconds to complete (e.g. worker 3 takes 3 seconds to finish).
Since we have a timeout for 5 seconds, the 6th worker will timeout. So, my question is, can you change this to refresh the timer at a later point during execution (e.g. when a worker finished or decides that it is progressing)?

Refer to image in the post below

In concrete terms, you can take a look at this program:

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

func main() {
	fmt.Println("handling request")
	workers := 10

	var wg sync.WaitGroup
	wg.Add(workers)

	timeout := 5 * time.Second

	ctx, _ := context.WithTimeout(context.Background(), timeout)

	for i := 1; i <= workers; i++ {
		go worker(ctx, i, &wg)
	}

	wg.Wait()
	fmt.Println("All workers terminated")
}

// Worker will take index * time.Second to return
func worker(ctx context.Context, index int, wg *sync.WaitGroup) {
	defer wg.Done()
	// fmt.Println("Worker ", index, "waiting for ", index, "seconds")

	select {
	case <-ctx.Done():
		// Cancel worker
		fmt.Println("Worker ", index, "timed out")
		return
	case <-time.After(time.Duration(index) * time.Second):
		// Process finished after some time
		fmt.Println("Worker ", index, "finished")
	}
}

It outputs:

handling request
Worker  1 finished
Worker  2 finished
Worker  3 finished
Worker  4 finished
Worker  5 finished
Worker  6 timed out
All workers terminated

How can I allow worker 6 to finish, since it will still be within the 5 seconds of the last inactive go routine.

I got a message in this line ;

ctx, _ := context.WithTimeout(context.Background(), timeout)

You need to add a var to receive the cancel function so:
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

And I do not know if it is possible for your isecase increment the numers of seconds to timeput, say for example 10…

Context is not designed the way you want to use it. If I got your idea right, you probably can implement a heartbeat for workers and use it as a point where you define, that they are not active anymore. Eg you have a goroutine, which listens to the workers heartbeat and as soon as one of them stops, it gives, 5 seconds for others to finish, if in those 5 seconds another one has finished, then the timer counts another 5 seconds and so on. This mechanic will probably solve your issue, but it will require a lot of additional work to do, since if you still want your workers to timeout, you will have to pass their own context and hold somewhere all the cancel functions with the heartbeat channels.

1 Like

If I got your idea right, you probably can implement a heartbeat for workers and use it as a point where you define, that they are not active anymore.

Yes your idea can solve this problem. For instance, there can be one common process that acts as the heartbeat receiver. If it does not receive any feedback (from any workers) in a stretch of 5 seconds, then it uses the context’s cancel function to stop all workers. If it does receive any feedback within 5 seconds, then it should restart the timer from the beginning.

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

func main() {
	fmt.Println("handling request")
	workers := 6

	var wg sync.WaitGroup
	wg.Add(workers + 2)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	heartbeat := make(chan bool, 1)

	go heartbeatReceiver(ctx, heartbeat, cancel, &wg)

	go infiniteWorkers(ctx, heartbeat, &wg)

	for i := 1; i <= workers; i++ {
		go worker(ctx, i, heartbeat, &wg)
	}

	wg.Wait()
	fmt.Println("All workers terminated")
}

func heartbeatReceiver(ctx context.Context, heartbeat <-chan bool, cancel context.CancelFunc, wg *sync.WaitGroup) {
	defer wg.Done()
	timeout := 5 * time.Second

	for {
		select {
		case <-time.After(timeout):
			// fmt.Println("timedout cancelling all go routines")
			cancel()
			return
		case <-heartbeat:
			fmt.Println("received heartbeat")
			// Restart timer
		}
	}
}

// Normal workers, will sleep for index * time.Second
func worker(ctx context.Context, index int, heartbeat chan<- bool, wg *sync.WaitGroup) {
	defer func() {
		// Send heartbeat when finished
		heartbeat <- true
		wg.Done()
	}()
	// fmt.Println("Worker ", index, "waiting for ", index, "seconds")

	select {
	case <-ctx.Done():
		// Cancel worker
		fmt.Println("Worker ", index, "timed out")
		return
	case <-time.After(time.Duration(index) * time.Second):
		// Process finished
		fmt.Println("Worker ", index, "finished")
	}
}

func infiniteWorkers(ctx context.Context, heartbeat chan<- bool, wg *sync.WaitGroup) {
	defer func() {
		// Send heartbeat when finished
		heartbeat <- true
		wg.Done()
	}()

	// worker will never finish
	fmt.Println("Worker will never finish unless cancelled...")

	select {
	case <-ctx.Done():
		// The context is over, stop processing results
		fmt.Println(" ->>> Worker cancelled!")
		return
	}
}

Output:

handling request
Worker will never finish unless cancelled...
Worker  1 finished
received heartbeat
Worker  2 finished
received heartbeat
Worker  3 finished
received heartbeat
Worker  4 finished
received heartbeat
Worker  5 finished
received heartbeat
Worker  6 finished
received heartbeat
 ->>> Worker cancelled!
All workers terminated

This works as expected; worker 6 is allowed to do its work, however a worker that never finished will be terminated.
Thanks!

1 Like

Just a piece of advice, for the heartbeat channels it’s better to use empty structs instead of bool type.

Just a piece of advice, for the heartbeat channels it’s better to use empty structs instead of bool type.

Any particular reasons?

Just a good practice. Empty structs do not require memory and moreover it increases code readability. Since when you see a channel with an empty struct, you will understand that it is used as a semaphore/lock in most cases. Instead of allocating memory for an unused boolean with no understanding from the first look if this value is actually needed somewhere

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.