I need to loop big amount of data. We do not know how many entities does it have on the input. Let’s assume it’s like csv reading line by line.
My goal is to limit the amount of parallel routines, let’s say it’s 10. It means at a moment we can proceed 10 entities max. I wrote a script, where i created listener, which is using channel as a semaphore to figure out if all entities are processed already.
Here is the script:
package main
import (
"fmt"
"runtime"
"strconv"
"strings"
"sync"
"time"
)
const (
goroutinesNum = 100
quotaLimit = 10
)
func startWorker(i int, wg *sync.WaitGroup, ch chan map[int]interface{}, increment chan int) {
defer wg.Done()
record := make(map[int]interface{})
record[i] = "test " + strconv.Itoa(i)
ch <- record
<-increment
cnt := len(ch)
fmt.Print(fmt.Sprintln("increment (", i, ") [channels ", cnt, "]", strings.Repeat(" ", cnt), "█"))
runtime.Gosched()
}
func main() {
//runtime.GOMAXPROCS(1)
wg := &sync.WaitGroup{}
increment := make(chan int, 30)
ch := make(chan map[int]interface{}, quotaLimit)
wg.Add(1)
go func(wg *sync.WaitGroup, increment chan int, ch chan map[int]interface{}) {
defer wg.Done()
for i := 0; i < goroutinesNum; i++ {
increment <- i
for len(ch) == quotaLimit {
time.Sleep(time.Second * 1)
}
wg.Add(1)
go startWorker(i, wg, ch, increment)
}
}(wg, increment, ch)
time.Sleep(time.Second)
for {
for len(ch) > 0 {
cnt := len(ch)
fmt.Print(fmt.Sprintln("channel (", cnt, ")", strings.Repeat(" ", cnt), "■"))
<-ch
}
if len(increment) == 0 {
break
}
}
wg.Wait()
}
I’m not sure i’m doing that right. I tried to play with select and timer constructions, it seems i’ve done the same approach without time waiter. Probably it would be good idea to use NewTicker here or context. Tried different approached, cannot understand what the correct one. I would appreciate for any critique and suggestions.
UPDATE:
i tried this solution as well:
But it does not work without timeout, correct? I do not need timeout, cause i have to be sure that all rows have been processed and it has to hang as much as it needs.
UPDATE1:
i found that this part hangs if there are really lots of routines:
for len(ch) == quotaLimit {
time.Sleep(time.Second * 1)
}
looks like it’s a wrong approach. The question then, how to pause the loop until the queue has free spots?