Semaphore for unknown amount of data rows

I need to loop big amount of data. We do not know how many entities does it have on the input. Let’s assume it’s like csv reading line by line.

My goal is to limit the amount of parallel routines, let’s say it’s 10. It means at a moment we can proceed 10 entities max. I wrote a script, where i created listener, which is using channel as a semaphore to figure out if all entities are processed already.

Here is the script:
package main

import (
	"fmt"
	"runtime"
	"strconv"
	"strings"
	"sync"
	"time"
)

const (
	goroutinesNum = 100
	quotaLimit    = 10
)

func startWorker(i int, wg *sync.WaitGroup, ch chan map[int]interface{}, increment chan int) {
	defer wg.Done()

	record := make(map[int]interface{})
	record[i] = "test " + strconv.Itoa(i)

	ch <- record
	<-increment

	cnt := len(ch)

	fmt.Print(fmt.Sprintln("increment (", i, ") [channels ", cnt, "]", strings.Repeat("  ", cnt), "█"))
	runtime.Gosched()
}

func main() {
	//runtime.GOMAXPROCS(1)
	wg := &sync.WaitGroup{}

	increment := make(chan int, 30)
	ch := make(chan map[int]interface{}, quotaLimit)

	wg.Add(1)
	go func(wg *sync.WaitGroup, increment chan int, ch chan map[int]interface{}) {
		defer wg.Done()
		for i := 0; i < goroutinesNum; i++ {

			increment <- i

			for len(ch) == quotaLimit {
				time.Sleep(time.Second * 1)
			}
			wg.Add(1)
			go startWorker(i, wg, ch, increment)
		}
	}(wg, increment, ch)

	time.Sleep(time.Second)
	for {
		for len(ch) > 0 {
			cnt := len(ch)
			fmt.Print(fmt.Sprintln("channel (", cnt, ")", strings.Repeat("  ", cnt), "■"))
			<-ch
		}

		if len(increment) == 0 {
			break
		}
	}

	wg.Wait()
}

I’m not sure i’m doing that right. I tried to play with select and timer constructions, it seems i’ve done the same approach without time waiter. Probably it would be good idea to use NewTicker here or context. Tried different approached, cannot understand what the correct one. I would appreciate for any critique and suggestions.

UPDATE:
i tried this solution as well:

But it does not work without timeout, correct? I do not need timeout, cause i have to be sure that all rows have been processed and it has to hang as much as it needs.

UPDATE1:
i found that this part hangs if there are really lots of routines:

for len(ch) == quotaLimit {
	time.Sleep(time.Second * 1)
}

looks like it’s a wrong approach. The question then, how to pause the loop until the queue has free spots?

Your description is a bit vague, so this is what i understood:

  1. you have set of data you want to process in parallel
  2. you want to process them in parallel with max of 10 items in parallel at a time? or it’s 10 items overall?
  3. I assumed the first (process 10 items at max in parallel) so when I read the code it seems too complex for what you’re describing

The following snippet is getting a set of items and processing them in parallel, so let me know what did I get wrong in your question and we can improve it.

package main

import (
	"fmt"
	"sync"
)

func main() {
	items := []int{1, 2, 3, 4, 5, 6, 7}
	maxWorkers := 10
	inputChan := make(chan int, maxWorkers)
	wg := sync.WaitGroup{}

	for i := 0; i < maxWorkers; i++ {
		wg.Add(1)
		go func() {
			worker(inputChan)
			wg.Done()
		}()
	}

	go func() {
		for item := range items {
			inputChan <- item
		}
		close(inputChan)
	}()

	wg.Wait()
}

func worker(input chan int) {
	for {
		item, more := <-input
		if more {
			item++
			fmt.Println(item)
		} else {
			break
		}
	}
}

Thank you for your response! Yes, you understood correct.
In your code in case some channel has a long running, then woker will call break, correct?

for item := range items {
    time.Pause(time.Second * 10)
	inputChan <- item
}

in a meanwhile seems i found the solution on stackoverflow:
https://play.golang.org/p/rSqcU9EZLzn

as far as i understood, jobs in range construction is waiting for any elements in channel until it’s not closed, is that correct?

for j := range jobs {

I don’t understand this part of your question:

The channel is just a way to pass the items to multiple go routines safely, so what do you mean it’s long running?

Maybe you mean if you’re trying to get an item from the channel for a while but didn’t get any?
if this is the case then you need to have a select statement with two cases one for pulling new items and another case for a timeout, so the worker function will look like this

func worker(input chan int) {
	for {
    select {
		case item, more := <-input :
      if more {
        item++
        fmt.Println(item)
      } else {
        break
      }
    case <-time.After(time.Second * 10):
      break
    }
	}
}

this select will wait for either one item from the channel or the end of the timeout, if the timeout ended it’ll break from the loop.

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.