Hello Everyone,
I am new to go language and learning the concurrency in go and one of the common patterns is “pipeline“. Below is my take on the pattern, please feel free to review it and let me know what all I can improve
package main
import (
"context"
"fmt"
"runtime"
"sync"
"time"
)
func producer(ctx context.Context, jobsCh chan<- int) {
defer close(jobsCh)
i := 0
for {
select {
case jobsCh ← i:
i++
time.Sleep(50 * time.Millisecond)
case <-ctx.Done():
fmt.Println("Stopping producer gorutine and jobsCh channel gracefully.")
return
}
}
}
func worker(jobsCh <-chan int, resultCh chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for i := range jobsCh {
resultCh ← i * i
}
}
func result(resultCh <-chan int) {
for s := range resultCh {
fmt.Printf("square %d\n", s)
}
}
func coordinator(wg *sync.WaitGroup, resultCh chan<- int){
wg.Wait()
close(resultCh)
fmt.Println("Stopping coordinator/worker gorutines and resultCh channel gracefully.")
}
func main() {
ctx, cancel := context.WithTimeout(
context.Background(),
1500*time.Millisecond
)
defer cancel()
jobsCh := make(chan int, 1024)
resultCh := make(chan int, 1024)
go producer(ctx, jobsCh)
var wg sync.WaitGroup
for range 3 {
wg.Add(1)
go worker(jobsCh, resultCh, &wg)
}
go coordinator(&wg, resultCh)
result(resultCh)
fmt.Printf("Pipeline fully drained !\n")
fmt.Printf("Number of gorutines : %d\n", runtime.NumGoroutine()) // should be 1
}
Also, could you let me know other common and important concurrency patterns like pubsub etc.
Cheers,
DD