I’m new to Go. I decided to start learning it the other day largely do to its first-class concurrency with channels.
I am building a toy application as a means of learning how to work with channels.
This application:
- Downloads files from a source location.
- Applies some transformation to them.
- Uploads the transformed files to an output location.
The transformation step is the slowest.
My ideal design would have downloaders gathering a handful of files at a time, just enough to keep the pool of transformers constantly supplied and busy. When a transformer is done with a file, it hands it off to the finishers to be uploaded.
Here is my attempt at this design, using time.Sleep
to simulate long processing time.
package main
import (
"fmt"
"strconv"
"time"
)
type task struct {
name string
path string
startTime time.Time
}
func downloader(id int, toDownload chan task, toProcess chan task) {
for {
task := <- toDownload
fmt.Printf("[downloader_%v]: downloading %v \n", id, task.name)
time.Sleep(5 * time.Second)
toProcess <- task
}
}
func transformer(id int, toProcess chan task, toFinish chan task) {
for {
task := <- toProcess
fmt.Printf("[processor_%v]: processing %v \n", id, task.name)
time.Sleep(20 * time.Second)
toFinish <- task
}
}
func finisher(id int, toFinish chan task, finished chan task) {
for {
task := <- toFinish
fmt.Printf("[finisher_%v]: uploading %v \n", id, task.name)
time.Sleep(7 * time.Second)
finished <- task
}
}
func fetchJobs(n int) *[]task {
tasks := make([]task, n)
for i := 0; i < n; i++ {
name := "job_" + strconv.Itoa(i);
tasks[i] = task{name: name}
}
return &tasks
}
func main() {
start := time.Now()
n := 10
toDownload := make(chan task)
toProcess := make(chan task)
toFinish := make(chan task)
finished := make(chan task)
for i := 0; i < 4; i++ {
go downloader(i, toDownload, toProcess)
}
for i := 0; i < 10; i++ {
go transformer(i, toProcess, toFinish)
}
for i := 0; i < 4; i++ {
go finisher(i, toFinish, finished)
}
jobs := fetchJobs(n)
for _, j := range *jobs {
toDownload <- j
}
for task := range finished {
fmt.Printf("finished %v in %s \n", task.name, time.Since(task.startTime) / time.Second)
}
fmt.Printf("all tasks finished in %s", time.Since(start) / time.Second)
}
And here is some of the output.
[processor_1]: processing job_0
[processor_3]: processing job_6
[processor_7]: processing job_5
[processor_6]: processing job_4
[downloader_2]: downloading job_9
[processor_5]: processing job_7
[downloader_1]: downloading job_8
[processor_8]: processing job_8
[processor_9]: processing job_9
[finisher_1]: uploading job_0
[finisher_3]: uploading job_1
[finisher_2]: uploading job_3
[finisher_0]: uploading job_2
[finisher_0]: uploading job_4
finished job_2 in 9.223372036s
finished job_1 in 9.223372036s
finished job_3 in 9.223372036s
finished job_0 in 9.223372036s
[finisher_3]: uploading job_5
[finisher_1]: uploading job_7
[finisher_2]: uploading job_6
[finisher_2]: uploading job_9
finished job_6 in 9.223372036s
finished job_7 in 9.223372036s
finished job_5 in 9.223372036s
finished job_4 in 9.223372036s
[finisher_1]: uploading job_8
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
main.main()
/Users/.../go/src/drive_uploader/main.go:79 +0x4b9
goroutine 6 [chan receive]:
main.downloader(0x0, 0xc000066060, 0xc0000660c0)
/Users/.../go/src/drive_uploader/main.go:17 +0x74
created by main.main
/Users/.../go/src/drive_uploader/main.go:63 +0x135
goroutine 7 [chan receive]:
main.downloader(0x1, 0xc000066060, 0xc0000660c0)
/Users/.../go/src/drive_uploader/main.go:17 +0x74
created by main.main
/Users/.../go/src/drive_uploader/main.go:63 +0x135
...
...
Unfortunately I am getting a deadlock and I’m not clear what’s causing it.
I have tried tweaking the counts of each goroutine; some combinations are able to run successfully without a deadlock. This is less than ideal though.
I have also tried using buffered channels as well as a WaitGroup. Again… some combinations will run successfully and others won’t.
Any help or feedback would be appreciated! I am especially curious if there is a more idiomatic approach in Go.