Use map with atomic/sync package

Hey experts,

I am new to Golang. I was learning the atomic/sync package. I came across the following code:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

type mapFunc struct {
	mapV map[int]int
}

func newMap() mapFunc {
	return mapFunc{
		mapV: make(map[int]int),
	}
}

func main() {
	var wg sync.WaitGroup
	var v atomic.Value
	//var mu sync.Mutex
	v.Store(mapFunc{mapV: map[int]int{0: 1, 1: 2}})

	go func() {
		var i int
		for {
			i++
			//mu.Lock()
			s := v.Load().(mapFunc)
			m := s.mapV
			m[0]++
			mapX := mapFunc{
				mapV: m,
			}
			v.Store(mapX)
			//mu.Unlock()
		}
	}()

	go func() {
		var i int
		for {
			i++
			//mu.Lock()
			s := v.Load().(mapFunc)
			m := s.mapV
			m[1]++
			mapX := mapFunc{
				mapV: m,
			}
			v.Store(mapX)
			//mu.Unlock()
		}
	}()

	wg.Add(5)
	for i := 0; i < 5; i++ {
		go func() {
			defer wg.Done()
			//mu.Lock()
			mapX, ok := v.Load().(mapFunc)
			if !ok {
				panic("Error")
			}
			fmt.Println(mapX)
			//mu.Unlock()
		}()
	}

	wg.Wait()
}

I have one doubt. If I execute the above-mentioned code with the race flag, I get a data race error. But when I use locks, then everything works fine. So my question is, how could I achieve this with only atomic.Value or atomic package without the use of mutexes? Because when using mutex I don’t require the Load and Store methods. I could simply initiate the map inside main goroutine and use:

mu.Lock()
m[0]++
mu.Unlock()

Any help would be appreciated. Thanks in advance

We can’t read and write at the same time, this is because writes to a map can cause the map to change. This has to do with how hash maps are implemented, there are a number of great articles written about this, here is a link to the actual source code.

So, if we want to write to a map, we need to always protect it with a mutex or some other mechanism to serialize map access. However, a trick we can use it this specific case is that the map doesn’t change on reads. Therefor if we never directly change the contents of the map, we can read from as many goroutines as we want.

What we can do is to store pointers in the map, after creation we will never change these pointers, just the values they point to. This is technically less efficient since we now how to store a pointer and the actual value, but does solve the race condition. I modified your code to use this principle which looks like this:

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
)

// This function allows us to create pointers from literals
func intPtr(newInt int64) *int64 {
	return &newInt
}

func main() {
	var wg sync.WaitGroup

	mapX := map[int]*int64{0: intPtr(1), 1: intPtr(2)}
	go func() {
		for {
			// atomic version of mapX[0]++
			atomic.AddInt64(mapX[0], 1)
		}
	}()

	go func() {
		for {
			// atomic version of mapX[1]++
			atomic.AddInt64(mapX[1], 1)
		}
	}()

	wg.Add(5)
	for i := 0; i < 5; i++ {
		go func() {
			defer wg.Done()

			for key, value := range mapX {
				// we need to use atomic.LoadInt64 to synchronize with the atomic.AddInt64 calls
				fmt.Printf("%d = %d\n", key, atomic.LoadInt64(value))
			}
		}()
	}

	wg.Wait()
}

I hope this answers your question.

1 Like

Yes, this will definitely work. But if you could explain a little bit about how I could do the same using Load and Store methods from atomic.Value?

Actually, the whole point behind this is that I want to use the atomic package for data types other than integers. That’s why I am using Load and 'Store` methods but am stuck with the race condition.

Actually I have never used atomic.Value in practice. Internally it is just a pointer to some data, and it allows you to atomically update this pointer. In every example I have seen, it is used in an RCU(Read-Copy-Update) pattern. For example this function is from the net/http package:

// RegisterProtocol registers a new protocol with scheme.
// The Transport will pass requests using the given scheme to rt.
// It is rt's responsibility to simulate HTTP request semantics.
//
// RegisterProtocol can be used by other packages to provide
// implementations of protocol schemes like "ftp" or "file".
//
// If rt.RoundTrip returns ErrSkipAltProtocol, the Transport will
// handle the RoundTrip itself for that one request, as if the
// protocol were not registered.
func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) {
	t.altMu.Lock()
	defer t.altMu.Unlock()
	oldMap, _ := t.altProto.Load().(map[string]RoundTripper)
	if _, exists := oldMap[scheme]; exists {
		panic("protocol " + scheme + " already registered")
	}
	newMap := make(map[string]RoundTripper)
	for k, v := range oldMap {
		newMap[k] = v
	}
	newMap[scheme] = rt
	t.altProto.Store(newMap)
}

Another example is the one given in the docs of sync/atmic: link

To update the map we first have to copy it, then modify it and then we can exchange it. Yet, we still need a mutex, since the map may not be modified between copying and exchanging it. Most of the time people opt for using the sync.RWMutex in such scenarios.

Since RCU requires us to copy the whole map/object, writes are slow for large maps/objects and require more memory to be allocated. The main benefit is that readers never have to wait on a Mutex or RWMutex to become unlocked. So there is a cpu vs memory tradeoff. You might want to pick one over the other depending on your use case and requirements.

Related to all of this is the sync.Map which you might want to take a look at if you are not already familiar. Again, a pretty niche data type as solution for a very specific problem. As with everything, pick the right tool for the job.

1 Like

Okay, thanks for providing the solution and explanation. Definitely, this will work. I will look into sync.Map as well.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.