Thread safe map storage for concurrent writing

I’m working on an app that uses MapReduce to work out stats on word lists. There are going to be multiple map methods and I want to fire them off concurrently but obviously need to catch the results.

While it is single threaded, I’m using this structure:


Which gives me data that looks like:

{"length": ["1", "1", "33"], "colour": ["red", "blue", "red"]}

From what I’ve read, I think I need a queue system where I take the result from the map and add it to the queue, the queue worker then steps through each entry in time and stores it into the data map. Assuming the standard queue is single threaded, it shouldn’t need a mutex to lock the map.

The reduce methods aren’t going to be called until all the maps are done so I don’t need to worry about those accessing the map.

A couple of questions, does this sound like the right way to solve the problem?

I’m worried that a single threaded queue will negate the benefits of multithreaded maps as it will become the bottleneck. If it is, would a multithreaded queue with a mutex be any better as the mutex is still slowing it down to almost single threaded.

You should use channels if you are talking about goroutines. If you are using distributed system then you may consider message queue, etc. Take a look also at Bolt or similar projects.

It will be goroutines so I’ll have a look at channels. Apart from the official docs, can you recommend any good examples?

The other option I’m going to offer users is Redis but this is for those who don’t want to have external dependencies.

Search on Youtube for Go concurrency, there are several great videos and do programming, it will come after some months, don’t worry.

There is no need to use Redis if you run this on one instance, look at Boltdb, I sent you the link.

No need to import a database for such a simple task. Do as follows:

  1. Your goroutines all share a channel (write only) where you can send a function that modifies the map. Make a type for it: type mapFunc func() error
  2. One goroutine manages the map: it reads the shared channel, and for every received object, it runs the function.
  3. Remember that if your types are not immutable (like numbers or strings) you will have to be careful if you use them outside of the function you send on the channel.

Post us the code you managed to write so far if you want more hints!

1 Like

@acim On Github, the Boltdb project says it has been archived by the owner but reading his message it seems like it is still perfectly usable, would you agree?

@Giulio_Iotti The reason for using a database is that despite most of the lists I’ll be processing probably not being too large, occasionally I’ll need to run big lists through it, some times 10’s of Mb. Keeping the results from 20 different checks in memory didn’t work well in Ruby and quickly filled the memory so I wanted to give the user the opportunity to push the storage to disk in these circumstances. I’ve not written any code yet but your clues make sense so I’ll see what I can get written.

This is what I’m trying to rewrite:

Tens of MBs, time 20 is just under 1 GB. I have 4 GB available on my phone. Give it a try, if you go OOM then you can think of other options. :slight_smile:

1 Like

I agree, you can just use a map, but regarding Bolt, I said Bolt or something like that, here is one more:

I’ll give it a try with just a map and see how it goes. Ruby didn’t like it but hopefully Go will cope better.

From the stdlib -

sync.Map is protected by mutex and I would rather use channels and select for read/write.

Channels are “protected” by mutex(s) also. Do some benchmarking and you might find that sync.Map is pretty speedy - that certainly is why it’s used inside Google.

Here’s the benchmark code for you:

That’s true :smile: It just looks nicer. It is easier for sure to use sync.Map, so I agree with you @clbanning.

This is what I’ve got so far with channels:

I initially had the problem of not all the goroutines finishing before I sent out the done and exited so I added the WaitGroup but it still isn’t working, the last few goroutines are sometimes not finishing before the app exits. The double function should return 1998 every time but doesn’t:

channels $ go run basic.go |grep 1998
Message passed: 1998
channels $ go run basic.go |grep 1998
Message passed: 1998
channels $ go run basic.go |grep 1998
Message passed: 1998
channels $ go run basic.go |grep 1998
channels $ go run basic.go |grep 1998
Message passed: 1998

I tried it using wg as a module variable (right term?) and passing it through as a parameter just in case that made a difference but it didn’t. What have I done wrong?

And, do I need to kill handle_ret with the done message or can I leave it hanging after the wg.Wait() is done and I get on with the rest of the app? It seems like the right thing to do rather than leaving it.

Thought I’d got it from a comment on StackOverflow that said the wg.Add should be done before the goroutine rather than after but just tried that and it still fails.

I think I’ve worked out what is going on. All the add_one and double functions are being called but handle_ret is being closed before it has handled everything so the queue doesn’t get emptied and the results don’t get printed. But if that is the case, it would mean that the channel doesn’t handle things in the order that they were put into it as all the adds and doubles must have been called for the first wait group to finish. I’ve just tested this by removing the done and swapping the wg.Add round and it seems to be working now. Have I correctly fixed it or just got lucky?

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.