Incoherent concurrency

I have recently decided to dip my toes into the concurrency pool and I’m currently losing the will to live. I know this is going to be simple for you well seasoned Gophers, but it’s breaking me!

Let me give you some context. I have an interface (called Vendor) which has a single method Search. I am creating a struct which holds a slice of these Vendors. Like so:

type VendorAggregator struct {
	vendors []Vendor
}

VendorAggregator has a single method called Get which will call a go routine on each of the elements in the vendors slice, aggregate the responses and send them back to the consumer. Simples??

Well, when I write a test to prove the function; I keep getting duplicate responses from only one of the elements in the slice.

type VendorMockA struct{}

func (va *VendorMockA) Search(p *models.Product) ([]models.Product, error) {
	return []models.Product{{"test", "vendora", 100, "g"}}, nil
}

type VendorMockB struct{}

func (vb *VendorMockB) Search(p *models.Product) ([]models.Product, error) {
	return []models.Product{{"test", "vendorb", 100, "g"}}, nil
}

func TestVendorAggregator_Get(t *testing.T) {
	// Given: a VendorAggregator with 2 mock services
	subject := &VendorAggregator{[]Vendor{
		&VendorMockA{},
		&VendorMockB{},
	},
	}

	// When: I request a single product
	actual, _ := subject.Get(&models.Product{"test", "test", 100, "g"})

	// Then: I expect a response from each vendor
	lenExpected := len(subject.vendors)
	if len(actual) != lenExpected {
		t.Errorf("Get returned %v results, but expected %v",
			len(actual), lenExpected)
	}
}

Admittedly, the test is not checking content. I do that through stepping through the code. Below is the value of actual.

Screen Shot 2017-01-10 at 21.20.48.png

For compleness, here is the actual Get method:

func (va *VendorAggregator) Get(p *models.Product) ([]models.Product, error) {
	ch := make(chan *VendorResponse)
	responses := []models.Product{}

	for _, vendor := range va.vendors {
		// Using the go keyword, we define an anonymous function that takes a vendor type and product.
		// The function then makes a Search call on the vendor taking a requested product. We use the
		// returned resp and err to create an instance of our VendorResponse type and send it to the channel.
		go func(v Vendor, p *models.Product) {
			resp, err := vendor.Search(p)
			ch <- &VendorResponse{resp, err}
		}(vendor, p)
	}

	for {
		select {
		case r := <-ch:
			if r.err != nil {
				fmt.Println("with an error", r.err)
			}
			responses = append(responses, r.inventory...) // the three dots are important for concatinating 2 slices
			if len(responses) == len(va.vendors) {
				return responses, nil
			}
		case <-time.After(50 * time.Millisecond):
			log.Printf(".")
		}
	}

	return responses, nil
}

Any help would be gratefully received.

I don’t suppose you could share a runnable example?

The first points of suspicion for me would be

  • The for loop closing over the loop variable, but you dodged that bullet
  • The slices being returned in the responses, as those are references and we don’t see where they come from
  • The input data. It’s a slice of interfaces, interfaces hold pointers, you may have two pointers to the same vendor

Edit: Looking closer, those two points are probably not it. Still want the runnable example. :slight_smile:

Unrelated, probably:

  • You seems to be adding zero or more responses from each vendor (responses = append(responses, r.inventory...)) yet you still compare the number of results with the number of vendors, which seems possibly wrong.

As always, run your thing under the race detector, there may be nothing but it may catch your problem.

The problem is on this line:

resp, err := vendor.Search§

It should be

resp, err := v.Search§

You have to use v, not vendor, so that you use the parameter to the goroutine, not the loop variable.

4 Likes

@iandawes this indeed fixed the error. Wow, what a nasty bug! I’m surprised that even worked in the first place.

@calmh thanks for taking the time to respond. I will take a look at some of your recommendations - I haven’t seen the race detector before. As always you make some great suggestions to my posts and are a real asset to the community.

1 Like

Yeah, I missed the loop variable as well, but in this case I think the race detector would indeed have caught it - try go test -race to begin with.

1 Like

For the very reason you illustrated, I prefer the “x := x” idiom for hiding for loop variables, because it means that it’s not actually possible to access the original variables. Another issue I see with the code is that it’s using the length of the responses slice to infer the number of responses, but it seems that each response can contain an arbitrary number of inventory items, so there’s no guarantee of that. For example, if one request returns an error, your loop will hang forever.

This kind of problem maps well to an abstraction provided by a package I wrote a while back, github.com/juju/utils/parallel. This also makes it trivial to limit the amount of potential concurrency.

A link to a few different possible variants: https://play.golang.org/p/WC4HHkWhhO

2 Likes

I cribbed off some notes I found online, and I have to admit in the first instance I just wanted to get something working.

Now that I have this working I can look at refactoring and preventing this from happening again. I guess at this moment I’m more surprised that what I had was even valid. In my mind I was referencing something that didn’t even exist in the function’s scope - so, I would have expected a compilation error.

I usually name the arguments of closures to the name of the variable I will be passing in to avoid this sort of thing.

Of course that practice helped me miss seeing the problem in the example code :blush:

1 Like

Thanks for sharing the code samples. I implemented the first option as an improvement on what I had, and it fixed another issue :relieved:.

Now that im comfortable with that, I’m going to take a look at your library. Many thanks everyone!

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.