Concurrent Design - deadlocks

I’m new to Go. I decided to start learning it the other day largely do to its first-class concurrency with channels.

I am building a toy application as a means of learning how to work with channels.
This application:

  1. Downloads files from a source location.
  2. Applies some transformation to them.
  3. Uploads the transformed files to an output location.

The transformation step is the slowest.
My ideal design would have downloaders gathering a handful of files at a time, just enough to keep the pool of transformers constantly supplied and busy. When a transformer is done with a file, it hands it off to the finishers to be uploaded.

Here is my attempt at this design, using time.Sleep to simulate long processing time.

package main

import (
	"fmt"
	"strconv"
	"time"
)

type task struct {
	name string
	path string
	startTime time.Time
}

func downloader(id int, toDownload chan task, toProcess chan task) {
	for {
		task := <- toDownload
		fmt.Printf("[downloader_%v]: downloading %v \n", id, task.name)
		time.Sleep(5 * time.Second)
		toProcess <- task
	}
}

func transformer(id int, toProcess chan task, toFinish chan task) {
	for {
		task := <- toProcess
		fmt.Printf("[processor_%v]: processing %v \n", id, task.name)
		time.Sleep(20 * time.Second)
		toFinish <- task
	}
}

func finisher(id int, toFinish chan task, finished chan task) {
	for {
		task := <- toFinish
		fmt.Printf("[finisher_%v]: uploading %v \n", id, task.name)
		time.Sleep(7 * time.Second)
		finished <- task
	}
}

func fetchJobs(n int) *[]task {
	tasks := make([]task, n)
	for i := 0; i < n; i++ {
		name := "job_" + strconv.Itoa(i);
		tasks[i] = task{name: name}
	}
	return &tasks
}

func main() {
	start := time.Now()

	n := 10

	toDownload := make(chan task)
	toProcess := make(chan task)
	toFinish := make(chan task)
	finished := make(chan task)


	for i := 0; i < 4; i++ {
		go downloader(i, toDownload, toProcess)
	}

	for i := 0; i < 10; i++ {
		go transformer(i, toProcess, toFinish)
	}

	for i := 0; i < 4; i++ {
		go finisher(i, toFinish, finished)
	}

	jobs := fetchJobs(n)
	for _, j := range *jobs {
		toDownload <- j
	}

	for task := range finished {
		fmt.Printf("finished %v in %s \n", task.name, time.Since(task.startTime) / time.Second)
	}

	fmt.Printf("all tasks finished in %s", time.Since(start) / time.Second)
}

And here is some of the output.

[processor_1]: processing job_0 
[processor_3]: processing job_6 
[processor_7]: processing job_5 
[processor_6]: processing job_4 
[downloader_2]: downloading job_9 
[processor_5]: processing job_7 
[downloader_1]: downloading job_8 
[processor_8]: processing job_8 
[processor_9]: processing job_9 
[finisher_1]: uploading job_0 
[finisher_3]: uploading job_1 
[finisher_2]: uploading job_3 
[finisher_0]: uploading job_2 
[finisher_0]: uploading job_4 
finished job_2 in 9.223372036s 
finished job_1 in 9.223372036s 
finished job_3 in 9.223372036s 
finished job_0 in 9.223372036s 
[finisher_3]: uploading job_5 
[finisher_1]: uploading job_7 
[finisher_2]: uploading job_6 
[finisher_2]: uploading job_9 
finished job_6 in 9.223372036s 
finished job_7 in 9.223372036s 
finished job_5 in 9.223372036s 
finished job_4 in 9.223372036s 
[finisher_1]: uploading job_8 
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:
main.main()
	/Users/.../go/src/drive_uploader/main.go:79 +0x4b9

goroutine 6 [chan receive]:
main.downloader(0x0, 0xc000066060, 0xc0000660c0)
	/Users/.../go/src/drive_uploader/main.go:17 +0x74
created by main.main
	/Users/.../go/src/drive_uploader/main.go:63 +0x135

goroutine 7 [chan receive]:
main.downloader(0x1, 0xc000066060, 0xc0000660c0)
	/Users/.../go/src/drive_uploader/main.go:17 +0x74
created by main.main
	/Users/.../go/src/drive_uploader/main.go:63 +0x135
...
...

Unfortunately I am getting a deadlock and I’m not clear what’s causing it.
I have tried tweaking the counts of each goroutine; some combinations are able to run successfully without a deadlock. This is less than ideal though.

I have also tried using buffered channels as well as a WaitGroup. Again… some combinations will run successfully and others won’t.

Any help or feedback would be appreciated! I am especially curious if there is a more idiomatic approach in Go.

package main

import (
	"fmt"
	"runtime"
	"strconv"
	"time"
)


func main() {
	start := time.Now()
	runtime.GOMAXPROCS(4)

	c := make(chan string)
	d := make(chan string)
	s := make(chan string)
	
	go makeCakeAndSend(c, "vanilla", 1)
	go makeCakeAndSend(d, "choco", 1)
	go makeCakeAndSend(s, "strabery", 4)

	go receiveAndPack(c, d, s)
	//time.Sleep(2 * 1e9)
	fmt.Printf("Process took %s \n", time.Since(start))
	fmt.Println(time.Now().Format("January 01, 2006 3:4:5 pm"))
}

func makeCakeAndSend(c chan string, flavour string, count int) {
	defer close(c)

	for i := 0; i < count; i++ {
		cakeName := flavour + " cake " + strconv.Itoa(i)
		c <- cakeName
	}
}

func receiveAndPack(c chan string, d chan string, s chan string) {
	cClosed, dClosed, sClosed := false, false,false
	for {
		if cClosed && dClosed && sClosed {
			return
		}
		//fmt.Println("Waiting for a new cake ...")
		select {
		case cake, ok := <-c:
			if ok == false {
				cClosed = true
				fmt.Println(" ... vanila channel closed!")
			} else {
				fmt.Println(cake)
			}
		case cake, ok := <-d:
			if ok == false {
				dClosed = true
				fmt.Println(" ... choco channel closed!")
			} else {
				fmt.Println(cake)
			}
		case cake, ok := <-s:
			if ok == false {
				sClosed = true
				fmt.Println(" ... strabery channel closed!")
			} else {
				fmt.Println(cake)
			}
		default:
			fmt.Println(" ... all channels closed!")
		}
	}

}

Maybe this can help you.

In your case, all of the goroutines are in a deadlock because you have not closed channels.
for example, the downloader is still waiting for new task.

task := <- toDownload

You can solve this problem by closing toDownload channel after you are done adding tasks to it. Also after closing your channel, you need to check it in downloader if the channel is closed.

for {
    task, ok := <- toDownload
    if !ok {
        break
    }
    // do something
}

or you can use for range which will stop when channel is closed.

for task := range toDownload { ... }

Similarly, you also need to close other channels so other goroutines also do not go into deadlock.

Here is finial code…

package main

import (
	"fmt"
	"strconv"
	"sync"
	"time"
)

type task struct {
	name      string
	path      string
	startTime time.Time
}

func downloader(id int, toDownload chan task, toProcess chan task, wg *sync.WaitGroup) {
	defer wg.Done()
	for task := range toDownload {
		fmt.Printf("[downloader_%v]: downloading %v \n", id, task.name)
		time.Sleep(5 * time.Second)
		toProcess <- task
	}
}

func transformer(id int, toProcess chan task, toFinish chan task, wg *sync.WaitGroup) {
	defer wg.Done()
	for task := range toProcess {
		fmt.Printf("[processor_%v]: processing %v \n", id, task.name)
		time.Sleep(20 * time.Second)
		toFinish <- task
	}
}

func finisher(id int, toFinish chan task, finished chan task, wg *sync.WaitGroup) {
	defer wg.Done()
	for task := range toFinish {
		fmt.Printf("[finisher_%v]: uploading %v \n", id, task.name)
		time.Sleep(7 * time.Second)
		finished <- task
	}
}

func fetchJobs(n int) *[]task {
	tasks := make([]task, n)
	for i := 0; i < n; i++ {
		name := "job_" + strconv.Itoa(i)
		tasks[i] = task{name: name}
	}
	return &tasks
}

func main() {
	start := time.Now()

	n := 10

	toDownload := make(chan task, n)
	toProcess := make(chan task, n)
	toFinish := make(chan task, n)
	finished := make(chan task, n)

	var dwg, twg, fwg sync.WaitGroup

	for i := 0; i < 4; i++ {
		dwg.Add(1)
		go downloader(i, toDownload, toProcess, &dwg)
	}

	for i := 0; i < 10; i++ {
		twg.Add(1)
		go transformer(i, toProcess, toFinish, &twg)
	}

	for i := 0; i < 4; i++ {
		fwg.Add(1)
		go finisher(i, toFinish, finished, &fwg)
	}
	
	go func() {
		for task := range finished {
			fmt.Printf("finished %v in %s \n", task.name, time.Since(task.startTime)/time.Second)
		}
	}()

	jobs := fetchJobs(n)
	for _, j := range *jobs {
		toDownload <- j
	}

	close(toDownload)

	dwg.Wait()
	close(toProcess)

	twg.Wait()
	close(toFinish)

	fwg.Wait()
	close(finished)

	// wg.Wait()
	fmt.Printf("all tasks finished in %s", time.Since(start)/time.Second)
}

And please don’t mind my English :slight_smile:

This is another interesting way to use channels.

select still confuses me a bit… basically is channel c is blocking then it tries to read from channel d, and so on?

case cake, ok := <-c:
cake := <- c

Is this a feature specific to channels, where you can ignore the second value ok?

Ah I see how to properly use WaitGroup and close() now.
I had tried using close() within the body of each goroutine, but it’s obvious to me now why that won’t work… it closes the channel for the entire pool of workers.

Thanks Girish!

There is nothing to apologize for… your English is great!

1 Like

The select statement lets a goroutine wait on multiple communication operations.
A select blocks until one of its cases can run, then it executes that case. It chooses one at random if multiple are ready. source

Not just specific to channels, second variable also can be ignored when doing type assertion, accessing value form a map using the key.

var x interface{} = 23
y, ok := x.(int)   // ok is true if type assertion is successful 
z := x.(int)  // you can ignore second value, if type assertion is not successful then z will be zero valued

m := make(map[string]int)
m["abc"], m["xyz"] = 123, 1123

value, ok := m["abc"]   // ok is true if key "abc" is present in the map, value = 123
value = m["xyz"]  // ignore second value
value, ok = m["pqr"]  // here ok = false, value = 0 (zero value) 

:slight_smile:

With functions that return two values: if I ignore the second then I get a compile error. Are these special cases then?

Yes

Gotcha.
Thanks for the help!