I am producing data in 2 goroutines periodically, using time.NewTicker
and dump it into a channel
.
Multiple consumer goroutines are reading from the channel
and dump data into a slice
.
I want to process this slice
in a separate function, then reset it (slice = slice[:0]
) and resume the producers.
This process should repeat every time ticker
ticks until user presses CTRL+C.
PROBLEMS
- I do not know how to pause the producers and then resume them when slice is processed
- I do not know how to check if consumers have fully drained the channel
- I do not know how to determine a safe moment to process the
slice
MVCE
At the moment I can give you working code copied from my VS Code, if I manage to solve any of the problems listed before, I will update the code to reflect my progress. Here is the code:
package main
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
func producer(ctx context.Context, waitGroup *sync.WaitGroup, dataChannel chan<- int, ticker <-chan time.Time, foo func(int) int) {
defer waitGroup.Done()
// recordset slice shall be used to mimick complex data,
//variable i will be used to simulate data changing per every tick
i := 0
recordset := make([]int, 3)
for {
select {
case <-ticker:
// simulate data changing
i = foo(i)
recordset[0] = i
i = foo(i)
recordset[1] = i
i = foo(i)
recordset[2] = i
// dump data into dataChannel
for _, record := range recordset {
dataChannel <- record
}
case <-ctx.Done():
return
}
}
}
func consumer(ctx context.Context, waitGroup *sync.WaitGroup, dataChannel <-chan int, slice *[]int, mutex *sync.Mutex) {
defer waitGroup.Done()
for {
select {
case <-ctx.Done():
return
case value := <-dataChannel:
mutex.Lock()
*slice = append(*slice, value)
mutex.Unlock()
}
}
}
func main() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
slice := make([]int, 0, 3)
dataChannel := make(chan int, 100)
mutex := &sync.Mutex{}
waitGroup := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
waitGroup.Add(1)
// we will use simple incrementing of slice elements to mimick changing of data
go producer(ctx, waitGroup, dataChannel, ticker.C, func(i int) int { return i + 1 })
waitGroup.Add(1)
// we will use simple decrementing of slice elements to mimick changing of data
// this way we can easilly follow program behavior from Terminal
go producer(ctx, waitGroup, dataChannel, ticker.C, func(i int) int { return i - 1 })
for i := 0; i < 2; i++ {
waitGroup.Add(1)
go consumer(ctx, waitGroup, dataChannel, &slice, mutex)
}
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
<-sigs
cancel()
close(dataChannel)
}()
waitGroup.Wait()
// let us check if everything went right
fmt.Println("Final slice: ")
for _, element := range slice {
fmt.Printf("%d ", element)
}
}