Help improve go concurrency code

This function is to fetch some xml content and save it. One issue i encounterd is that how to send case s.updates ← pending[0]: (pending Item) one at a time. the code currently has an error of accessing index out of bound.

here’s the link to the full code : Better Go Playground

func(s *sub) loop(){
	var pending []Item // appended by fetch; consumed by send
	var next time.Time
	var err error
	var seen = make(map[string]bool) // set of item.GUIDS
	for{
		// var updates chan Item
		var fetchDone chan fetchResult// if non-nil, fetch is running
		var fetchDelay time.Duration // initally 0
		if now := time.Now(); next.After(now) {
			fetchDelay = next.Sub(now)
		}
		var startFetch <-chan time.Time
		if fetchDone == nil && len(pending) < maxPending {
			startFetch = time.After(fetchDelay)
		}
		select{
		case errc := <-s.closing:
			errc <- err
			close(s.updates)// tells receiver we're done
			return
		case <-startFetch:
			fmt.Println("start fetch")
			var fetched []Item
			fetchDone = make(chan fetchResult, 1)
			go func(){
				fetched, next, err = s.fetcher.Fetch()
				fetchDone <- fetchResult{fetched, next, err}
			}()


		case s.updates <- pending[0]:
			fmt.Println("add pending")
      if len(pending) > 0 {
				pending = pending[1:]
			}
		case result := <-fetchDone:
			fmt.Println("fetch done")
			fetchDone = nil
			if result.err != nil {
					next = time.Now().Add(10 * time.Second)
					break
			}
			for _, item := range result.fetched {
					if !seen[item.Channel.Link] {
							pending = append(pending, item)
							seen[item.Channel.Title] = true
					}
			}
		}
	
	}
}

Hello there,
The code in the link Better Go Playground
The code is about fetching .xml data on the web, and store it, I wanted to fetch 10 items at a time. and store it one at a time by passing to a channel named updates.
Thank you

There are a lot of problems with this code. First is scoping of variables: The variable fetchDone is declared inside the for loop, so it is reset after each iteration. This breaks most of your logic.
Overall your code is very complex for what you are trying to accomplish. There are established pattern for writing parallel functions (fan out, fan in) with sync.WaitGroup or similar mechanisms.

Try to build your code very easy to read: A group of worker go-routines, which all read from a single channel where work comes and and push their result into a single channel where the results come out. And use the for w := range over channel to handle closing of channels gracefully

1 Like