Handling periodic data production and consumption with synchronized processing in Go

I am producing data in 2 goroutines periodically, using time.NewTicker and dump it into a channel.
Multiple consumer goroutines are reading from the channel and dump data into a slice.

I want to process this slice in a separate function, then reset it (slice = slice[:0]) and resume the producers.
This process should repeat every time ticker ticks until user presses CTRL+C.

PROBLEMS

  1. I do not know how to pause the producers and then resume them when slice is processed
  2. I do not know how to check if consumers have fully drained the channel
  3. I do not know how to determine a safe moment to process the slice

MVCE

At the moment I can give you working code copied from my VS Code, if I manage to solve any of the problems listed before, I will update the code to reflect my progress. Here is the code:

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"
)

func producer(ctx context.Context, waitGroup *sync.WaitGroup, dataChannel chan<- int, ticker <-chan time.Time, foo func(int) int) {
    defer waitGroup.Done()
    // recordset slice shall be used to mimick complex data,
    //variable i will be used to simulate data changing per every tick
    i := 0
    recordset := make([]int, 3)
    for {
        select {
        case <-ticker:
            // simulate data changing
            i = foo(i)
            recordset[0] = i
            i = foo(i)
            recordset[1] = i
            i = foo(i)
            recordset[2] = i
            // dump data into dataChannel
            for _, record := range recordset {
                dataChannel <- record
            }
        case <-ctx.Done():
            return
        }
    }
}

func consumer(ctx context.Context, waitGroup *sync.WaitGroup, dataChannel <-chan int, slice *[]int, mutex *sync.Mutex) {
    defer waitGroup.Done()
    for {
        select {
        case <-ctx.Done():
            return
        case value := <-dataChannel:
            mutex.Lock()
            *slice = append(*slice, value)
            mutex.Unlock()
        }
    }
}

func main() {
    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

    slice := make([]int, 0, 3)

    dataChannel := make(chan int, 100)
    mutex := &sync.Mutex{}
    waitGroup := &sync.WaitGroup{}

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    ticker := time.NewTicker(3 * time.Second)
    defer ticker.Stop()

    waitGroup.Add(1)
    // we will use simple incrementing of slice elements to mimick changing of data
    go producer(ctx, waitGroup, dataChannel, ticker.C, func(i int) int { return i + 1 })
    waitGroup.Add(1)
    // we will use simple decrementing of slice elements to mimick changing of data
    // this way we can easilly follow program behavior from Terminal
    go producer(ctx, waitGroup, dataChannel, ticker.C, func(i int) int { return i - 1 })

    for i := 0; i < 2; i++ {
        waitGroup.Add(1)
        go consumer(ctx, waitGroup, dataChannel, &slice, mutex)
    }

    waitGroup.Add(1)
    go func() {
        defer waitGroup.Done()
        <-sigs
        cancel()
        close(dataChannel)
    }()

    waitGroup.Wait()

    // let us check if everything went right
    fmt.Println("Final slice: ")
    for _, element := range slice {
        fmt.Printf("%d ", element)
    }
}

Your code looks too messy. I rewrote it and the logic is not hard to see:

stop -> producer -> consumer -> print

So, it’s very simple to get the following code:

func producer(ctx context.Context, waitGroup *sync.WaitGroup, dataChannel chan<- int, ticker <-chan time.Time, foo func(int) int) {
	defer waitGroup.Done()
	// recordset slice shall be used to mimick complex data,
	//variable i will be used to simulate data changing per every tick
	i := 0
	recordset := make([]int, 3)
	for {
		select {
		case <-ticker:
			// simulate data changing
			i = foo(i)
			recordset[0] = i
			i = foo(i)
			recordset[1] = i
			i = foo(i)
			recordset[2] = i
			// dump data into dataChannel
			for _, record := range recordset {
				dataChannel <- record
			}
		case <-ctx.Done():
			return
		}
	}
}

func consumer(waitGroup *sync.WaitGroup, dataChannel <-chan int, slice *[]int, mutex *sync.Mutex) {
	defer waitGroup.Done()
	for {
		value, ok := <-dataChannel
		if !ok {
			return
		}
		mutex.Lock()
		*slice = append(*slice, value)
		mutex.Unlock()
	}
}

func main() {
	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	dataChannel := make(chan int, 100)
	slice := make([]int, 0, 3)

	// producer
	ticker := time.NewTicker(3 * time.Second) //Why do producers share a timer? This seems to be a bug.
	defer ticker.Stop()
	go func() {
		var wg sync.WaitGroup

		wg.Add(1)
		go producer(ctx, &wg, dataChannel, ticker.C, func(i int) int { return i + 1 })
		wg.Add(1)
		go producer(ctx, &wg, dataChannel, ticker.C, func(i int) int { return i - 1 })

		wg.Wait()
		close(dataChannel)
	}()

	// consumer
	var mutex sync.Mutex
	var waitGroup sync.WaitGroup
	for i := 0; i < 2; i++ {
		waitGroup.Add(1)
		go consumer(&waitGroup, dataChannel, &slice, &mutex)
	}

	// stop
	<-sigs
	cancel()
	waitGroup.Wait()

	// let us check if everything went right
	fmt.Println("Final slice: ")
	for _, element := range slice {
		fmt.Printf("%d ", element)
	}
}

Ticker is shared because both producers should produce data at the same interval.
Can you offer better approach?

The Time.Ticker is generally not shared. Because when the time is triggered, only one data will be generated. This means that when the time comes, only one producer will work.
If you expect producers to work simultaneously, you should use a channel to trigger producer work instead of sharing time.Ticker.