I have a map of ‘tasks’ that need to be completed by multiple goroutines. All routines must complete all tasks but no task can be run by more than one routine at the same time.
I initially created a map of mutexes to handle this. but then run into concurrency problems. So I created a Parent container with another mutex. The idea being, lock Mu, then lock, unlock the task mutexes. but of course this doesn’t work either because if the task mutex blocks then the Container gets blocked as well.
type Container struct {
Mu sync.mutex
tasks map[string]sync.mutex
}
How can I write this so that each goRoutine is not blocked unnecessarily and the code is threadsafe.
Make a chain of channels from one goroutine to the next. The first goroutine performs the first task and then passes on the next channel to the next goroutine, etc. A subsequent goroutine never sees the a task until it has been completed by the prior goroutine, so no explicit synchronization is necessary.
but would this not mean that goroutine 2 can only start after goroutine 1 has finished at least one work ? Even worse, goroutine N-1 can start only when goroutine N-2 has finished at least one work, and goroutine N-2 can start when goroutine N-3 has finished some work, and so on…
Yes, but simple use of mutex would likely result in the same behavior. To get full concurrency from the start you’d need more sophisticated scheduling such as marking which tasks have been completed by each goroutine or each goroutine maintaining a set of completed tasks, etc. Perhaps you could write a scheduler goroutine that reads each task and submits it to goroutines on other channels and it keeps track of which goroutines are working each task and which have completed each task. That would keep the goroutines themselves simple and sync-free.
The simple solution I first presented would be perfectly adequate and the loss of initial concurrency would be negligible if you have many short-running tasks. If you have relatively few long-running tasks, then the scheduler approach may be necessary.
Keep the map of tasks, and each goroutine G iterates on the map keys, trying to see if there is anyone left which is both a) not being processed by another goroutine and b) has not already been processed by G itself.
To solve b) each goroutine G needs to keep track of what tasks it already completed, but that is easy since the task names (keys of the task map) are unique.
Make sure to include adequate synchronization around all those map accesses capturing both test and set in the same sync. This is where it gets error-prone and allowing race conditions is easy.
This complexity of low-level synchronization is what inspires the first go proverb https://go-proverbs.github.io/. Go certainly provides the necessary primitives, but it is more idiomatic to use channel communication instead.
Last edit, I promise. If you (@tranman) haven’t done this level of asynchronous coding before, it’s worthwhile to do it a few times in moderately complex cases so that you come to appreciate the proverb.
Thank you, both of you for your ideas. For now, I implemented the cheap/simple option. Cos I need something now. I’ll look into the sync/atomic package when I got a bit more time
I’m going to reply to this because I don’t exactly know when you’ll have a bit more time
Do you mean that you have, let’s say 3 goroutines which I’ll name a, b, and c, and that all 3 of them have to work on every task (e.g. task 1 needs a, b, and c to execute on it and task 2 needs a, b, and c to execute on it, etc.)? Does the order of the tasks matter (i.e. does a need to be done before b and does b need to be done before c)?
If so, I would think you just need to to this:
type task struct { /* ... */ }
type taskResult struct {
key string
task task
}
results := make(chan taskResult, 8) // however much buffering you want
wg := sync.WaitGroup{}
// Iterate over the keys and values from your "main" goroutine; no locking
// necessary.
for k, v := range tasks {
wg.Add(1)
go func(key string, task task) {
defer wg.Done()
// You don't need any synchronization between a, b, and c because
// each worker does them in sequence, but each job as a whole is
// still done concurrently
doA(key, task)
doB(key, task)
doC(key, task)
results <- taskResult{key, task}
}(k, v)
}
go func() {
wg.Wait()
close(results)
}()
for _, res := range results {
// do something with the results. Maybe you need to update the map?
// You don't need any locks, though, because the map isn't being
// accessed concurrently.
tasks[res.key] = res.task
}
@skillian . thanks for your suggestions. You got that about right. Each consumer ( let’s call it), must execute all tasks and in the same order. As an added complication, the processing time for these tasks can vary, from a few short seconds to mins and even > an hour.
So as the number of consumers grows, it will become important that say when consumer C is processing a long task n, consumers A & B are still free to process tasks n+1, n+2 etc.
atm, I implemented a cheap solution, but I am sure it will not perform well as the system grows.
Anyway, thanks again, I am still mulling this over…
My understanding of Go is not good enough here. But looking at your code, I assume that instantiating a go routine per task in the for loop, will not guarantee the task precedence I need.