Related to your What's issue with this goroutine code?, your codes are not reflecting to your objective. The design focus is not 5
. Generally speaking, your problem would not be far from the 5 philosophers dining problem example I had given to you.
Ok, let me think out them…I’m new to golang and this type of concurrent/goroutine, may need more time and more tips I think.
From your Step 2:
package main
import (
"bufio"
"fmt"
"net/http"
"time"
)
func httpCall(url string) {
client := http.Client{
Timeout: 5 * time.Second,
}
received, err := client.Get(url)
if err != nil {
panic(err)
}
defer received.Body.Close()
fmt.Println("status: ", received.StatusCode)
scanner := bufio.NewScanner(received.Body)
for i := 0; scanner.Scan() && i < 5; i++ {
fmt.Println(scanner.Text())
}
if err := scanner.Err(); err != nil {
panic(err)
}
}
func main() {
urls := []string{
"http://gobyexample.com",
"http://att.com",
"http://domaintools.com",
"http://microsoft.com",
"http://google.com",
}
for i := 0; i < len(urls); i++ {
httpCall(urls[i])
}
}
It’s clearly visible that httpCall
is your goroutine
(or worker). You can see clearly now that in order to spin 5 workers, your main
would just have to do:
for i := 0; i < len(urls); i++ {
go httpCall(urls[i])
}
In fact, you already be able to spin n
numbers of workers based on number of urls without specifying explicitly.
However, we got 2 problems:
- Can the worker dump the printout back to
main
(the one you’re looking for) instead of doingfmt
printing on its own? If yes, how. - How does
main
know how long should it wait for all independent goroutines to complete? Do we need a sync here to wait?
Incorporate these 2 problems into your Plan 1 to formulate Plan 2. Stop the code investigations. At this point, you should deal with the planning first.
Can the worker dump the printout back to
main
(the one you’re looking for) instead of doingfmt
printing on its own? If yes, how.
Yes I think we can use channel to achieve this. to do this, we can add a string type channel as 2nd argument in goroutine and send the scanner.Text()
(string) to this channle. at main goroutine, we can receive this string from this channel. and we expect 5 receiving from this channel.
How does
main
know how long should it wait for all independent goroutines to complete? Do we need a sync here to wait?
continued from last one,main
goroutine expects 5 receiving from the channel, so I think I can build a for
loop to handle 5 times of receiving from channel, which syncs up with goroutine.
Let me know how you think.
Thanks
accordingly, my plan 2 is updated as below:
- main:
- make a string type channel
- build a urls slice for 5 urls
- create 5 goroutines for url search, accepting two arguments:
- 1st argument is
url string
- 2nd argument is string type channle
- 1st argument is
- use
for
loop to receive 5 strings from the channel and print them out to screen - this also syncs up with goroutines
- goroutine:
- create
http.Client
withTimeout: 5 * time.Second
- use
clinet.Get(url)
to scrape website as per given url - check error, and
defer received.Body.Close()
- read first 5 lines of html body and concatenate them to a string
- check error
- send result to channel
- create
Excellent! sharp. We now have a channel contact switching between main
and worker
. So, remember my keyword:
WHAT
string
channel for each dedicated workers
WHY
for workers to asynchronously reply back to main
to printout.
WHO
owner: main
sender: worker
closer: ??? (is closing necessary?)
WHEN
creation: in the worker
creation for loop. Main must manage the channels separately.
HOW
main
as receiver, read output from channel.worker
as sender, send all output to channel.
You can now improve step 2 with channel implementation and reply back.
Is counting the messages guarantees that all workers are done?
Example:
You sent 5 workers out to various shops for buying random amount of bread. When they return, they inform you they are back while passing you some random about of bread per person.As the
main
person, which signals is more guaranteed? Counting bread / worker informing you they’re back?
So if we plot it nicely…
WHAT
??? using what signal then determine the tool??
WHY
for main
to sync up with all workers
by waiting so that main
can end the program.
WHO
owner: main
sender: worker
WHEN
creation: at main
side.
HOW
main
as signal receiver.worker
as signal sender.- We don’t know yet until we decide WHAT to use.
Can you find out which tool is the best candidate for this problem?
@jameswang2015, all right there?
@hollowaykeanho, sorry for the delay - super busy with my daytime development and now let’s continue this issue!
WHO
owner:main
sender:worker
closer: ??? (is closing necessary?)
in my step2 plan the main rely on receiving exactly same number of channel as workers, so the close(chan)
not required in my case.
here is the code per my plan 2.
package main
import (
"bufio"
"fmt"
"net/http"
"time"
)
func httpCall(url string, result chan string) {
client := http.Client{
Timeout: 5 * time.Second,
}
received, err := client.Get(url)
if err != nil {
result <- err.Error()
return
}
defer received.Body.Close()
fmt.Println("status: ", received.Status)
scanner := bufio.NewScanner(received.Body)
var res string
for i := 0; scanner.Scan() && i < 5; i++ {
res += scanner.Text()
}
if err := scanner.Err(); err != nil {
result <- err.Error()
return
}
result <- url + "\n" + res
}
func main() {
result := make(chan string)
urls := []string{
"http://gobyexample.com",
"http://att.com",
"http://domaintools.com",
"http://microsoft.com",
"http://google.com",
}
for i := 0; i < len(urls); i++ {
go httpCall(urls[i], result)
}
for j := 0; j < len(urls); j++ {
fmt.Printf(<-result, "\n")
}
}
I’ll need to think about the better tool or more tips to improve it.
There are 3 known tools at your disposal. I’m sure by now you know each of them in detail so I skip the explanation.
- channel
- mutex.RWLock
- sync.WaitGroup
Take your time and choose your signal based on the available tools. No.1 and No.3 are interchangeable in my Philosopher’s example.
I think WaitGroup could be the best fit in this case. In a more general scenario, let’s say we have m workers
, n jobs
, then main can use wg.Add(m)
to explicitly wait for signals from all workers when all of them get n jobs
done.
- main:
- create
sync.WaitGroup
wg.Add(m)
- create channel1 for sending jobs from main to goroutine
- (optional) if main needs to receive info from goroutine, then create channel2 for receiving info from workers to main
- create
m workers
- 1st argument: channel1 for sending jobs to workers
- (optional) 2nd argument: channle2 for receiving info from workers
- 3rd argument: &wg(avoid global variable)
- assign jobs through channel1
- require
close(channel1)
to send signal to workers to stop
- require
- receive info through channel2
- no need for closing channel2 - main receive info from workers as long as workers have info sent to channel2, use WaitGroup for terminate whole program
- create
wg.Wait()
- create
- workers:
- keep receiving jobs through channel1
- do the job
- send result back to main via channel2
- if receive
close(channel1)
, then stop work and notify main
in this case main don’t expect receive all results from each worker for each jobs, as long as there is result sent into channel2 main can handle it. and main use wg
to sync up with workers to terminate entire program
By the way, normally how many workers we can create to get the best use of the resources for optimal performance?
Perfect! Now upgrade your code again. You should get what you want by now. Congratulations! You just got a deadlock-free and race-free concurrency. The development is not that complicated isn’t it?
No straight answer. You need to run benchmark to test it. Concurrency varies between operating system & hardware.
This is an optional next step: how to control no. of spawn so that we don’t overload the cpu. Obviously, you can do another loop in main that sync them up. That’s another story.
As a wrap up, to plan for concurrency:
- Run it in a single process (
main
) - Abstract the key functions into dedicated process (
httpCalls
). Isolate them. - Refactor them to the finest.
- Identify the workers and their contact switching.
- Plan the controls between goroutines.
- Optimize it.
About your main
printout, you can consider using for...select
loop. It cleaner and sensible. E.g.:
To integrate sync.WaitGroup
into that for...select
loop, you can tomaz way of doing things:
Or use 2 channels instead of sync.WaitGroup
, like the one I implemented in Philosophers with fullness
and c1
channel. In your case, it looks something like:
for {
select {
case message := <-msg:
... // handle message
case <-ended:
... // handle completed count
done++ // increase done count
if done == headcount {
return // once everyone is done. Exit.
}
}
}
@hollowaykeanho Thanks
I’m rethinking this with four scenarios and let me if this makes any sense to you.
-
case1: one way communication (from main to goroutine only).
- create m workers
- assign n jobs
- main won’t terminate until all goroutines get all jobs done
- I’d like to use
sync.WaitGroup{}
to sync up between main and goroutines
-
case2: one way communication but for any job that exceeds some preset time it times out and get ignored
- instead of
wg
, I’d like to use a dedicated channel for goroutines to notify main whenever it get the job done. on main I’d like to useselect
to choose between this channel and a preset timer to achieve this time out function - use
for
loop with same number of iteration as number of jobs to sync up between main and goroutine
- instead of
-
case3: two way communication (main send jobs to goroutines and goroutines send results back to main)
- create n jobs
- make these buffered channels with size of
n - m
to avoid dead lock
- make these buffered channels with size of
- make another channel for goroutine to send result back to main
- create m workers
- assign n jobs
-
close(job)
to notify goroutines - use
for
loop with same number of iteration as number of jobs to receive result of each job from goroutines - at goroutine, use
for ... range...
to keep receiving jobs, do jobs, send result back to main, until get notified that job close - bascally, use result channel to sync up main and goroutines
- create n jobs
-
case4: two way communication with time limit that main time out any result from goroutines if that job isn’t done on time
- use
for
loop with same number of iteration as number of jobs - use
select
to choose between result from goroutine and a timer - basically, use the result channel to sync up between main and goroutines
- use
below are codes for each cases.
//case1:
// workers: m
// jobs: n
// no response is required from goroutine back to main
// main won't terminate until all goroutines finish jobs
// use sync.WaitGroup
package main
import (
"fmt"
"strconv"
"sync"
"time"
)
const numberOfWorkers = 3
const numberOfJobs = 5
func worker(id int, job chan string, wg *sync.WaitGroup) {
defer wg.Done()
for receivedJob := range job {
fmt.Printf("worker %d received %s. working\n", id, receivedJob)
time.Sleep(2*time.Second)
fmt.Printf("worker %d get %s done.\n", id, receivedJob)
}
}
func main() {
wg := sync.WaitGroup{}
wg.Add(numberOfWorkers)
job := make(chan string)
for w := 0; w < numberOfWorkers; w++ {
fmt.Printf("created worker %d\n",w)
go worker(w, job, &wg)
}
for j := 0; j < numberOfJobs; j++ {
fmt.Printf("assigned job %d\n", j)
job <- "job " + strconv.Itoa(j)
}
close(job)
wg.Wait()
}
//case2
// workers: m
// jobs: n
// no response is required from goroutine back to main
// main won't terminate until all goroutines finish jobs
// except the processing of each job by goroutine exceed preset timer
// ues channel + select + time.After to control this
package main
import (
"fmt"
"strconv"
"time"
)
const numberOfWorkers = 3
const numberOfJobs = 5
func worker(id int, job chan string, done chan bool) {
for receivedJob := range job {
fmt.Printf("worker %d received %s. working\n", id, receivedJob)
time.Sleep(1*time.Second)
fmt.Printf("worker %d get %s done.\n", id, receivedJob)
done <-true
}
}
func main() {
job := make(chan string, 2)
done := make(chan bool)
for w := 0; w < numberOfWorkers; w++ {
fmt.Printf("created worker %d\n",w)
go worker(w, job, done)
}
for j := 0; j < numberOfJobs; j++ {
fmt.Printf("assigned job %d\n", j)
job <- "job " + strconv.Itoa(j)
}
close(job)
for r := 0; r < numberOfJobs; r++ {
select {
case <-done:
fmt.Println("one job done on time")
case <-time.After(2*time.Second):
fmt.Println("time out")
}
}
}
//case3
// workers: m
// jobs: n
// require goroutine send result of each job back to main
// use 2nd channel to send result from goroutine back to main
// use for loop to iterate same number of loops as number of jobs
// to sync up with goroutine
package main
import (
"fmt"
"strconv"
"time"
)
const numberOfWorkers = 3
const numberOfJobs = 5
func worker(id int, job chan string, result chan string) {
for receivedJob := range job {
fmt.Printf("worker %d received %s. working\n", id, receivedJob)
time.Sleep(1*time.Second)
result <-receivedJob + " done by worker " + strconv.Itoa(id)
fmt.Printf("worker %d get %s done.\n", id, receivedJob)
}
}
func main() {
job := make(chan string, numberOfJobs)
result := make(chan string)
for w := 0; w < numberOfWorkers; w++ {
fmt.Printf("created worker %d\n",w)
go worker(w, job, result)
}
for j := 0; j < numberOfJobs; j++ {
fmt.Printf("assigned job %d\n", j)
job <- "job " + strconv.Itoa(j)
}
close(job)
for r := 0; r < numberOfJobs; r++ {
received := <-result
fmt.Printf("received result %d : %s\n", r, received)
}
}
//case4:
// workers: m
// jobs: n
// require goroutine send result of each job back to main
// if main can't receive job in preset time then main get it
// times out and ignore it
// use 2nd channel to send result from goroutine back to main
// use for loop + select + time.After() to sync up with goroutine
package main
import (
"fmt"
"strconv"
"time"
)
const numberOfWorkers = 3
const numberOfJobs = 5
func worker(id int, job chan string, result chan string) {
for receivedJob := range job {
fmt.Printf("worker %d received %s. working\n", id, receivedJob)
time.Sleep(2*time.Second)
result <-receivedJob + " done by worker " + strconv.Itoa(id)
fmt.Printf("worker %d get %s done.\n", id, receivedJob)
}
}
func main() {
job := make(chan string, 2)
result := make(chan string)
for w := 0; w < numberOfWorkers; w++ {
fmt.Printf("created worker %d\n",w)
go worker(w, job, result)
}
for j := 0; j < numberOfJobs; j++ {
fmt.Printf("assigned job %d\n", j)
job <- "job " + strconv.Itoa(j)
}
close(job)
for r := 0; r < numberOfJobs; r++ {
select {
case received := <-result:
fmt.Printf("received result %d : %s\n", r, received)
case <-time.After(3*time.Second):
fmt.Printf("time out\n")
}
}
}
Thanks
Case 1, 2, 3 makes sense. Case 4 makes no sense. I will explain in terms of control and data synchronization between threads.
This is commonly done when the workers are 100% guaranteed returning and there is no data synchronization needed.
If you notice, the control is very passive. It’s very easy to lose control if the worker is not 100% guaranteed (as in main
will wait forever).
I consider case 2 is the same as case 1. Not much of difference excepting setting deadline termination.
The one recommended in your final plan is this case. It is slightly more complicated than case 1 in exchange for more direct and active control at main
.
Normally, we have main
acts like an active coordinators (like a boss of the company) instead of working some task. It builds all the necessary channels for all goroutines to talk to one another. In another word, each goroutines has its own dedicated channel for receiving incoming messages. E.g.:
main
creates its own receiving channelm1
for receiving completed signal.main
delegated encoder goroutine withr1
channel, passingm1
channel to it.main
delegated work to worker goroutine withw1
channel, passingr1
andm1
channels to it.main
wait for all workers completion.- worker goroutine directly passes output to
r1
channel. Once done, it notifiesmain
that it’s done viam1
. Whenmain
receives the signal, it dropsw1
from its future communications since it’s done. - Once
main
confirms termination period (either successful working / timeout),main
will notify all worker channels includingr1
for graceful closing (if needed). Otherwise, it terminates the program.
Why I consider case 4 does not makes any sense mainly because you have 2-ways communications established. main
is an active coordinator so it can terminate the program at will. Adding a timeout would be an overkill.
I won’t consider worker’s having deadline as “timeout” since it is its expected task. Hence, it’s outside of this context of of discussion.
Hands are tight. I will reply your code reviews in a separate post later.
For case 2, it should not defer too far from case 1. Here’s one simple replacement (not tested).
package main
import (
"fmt"
"strconv"
"time"
)
const (
numberOfWorkers = 3
numberOfJobs = 5
)
func worker(id int, job chan string, m1 chan int) {
defer m1 <- id
for receivedJob := range job {
fmt.Printf("worker %d received %s. working\n", id, receivedJob)
time.Sleep(2*time.Second)
fmt.Printf("worker %d get %s done.\n", id, receivedJob)
}
}
func main() {
sig := make(chan int)
job := make(chan string)
workerCount := numberOfWorkers
for w := 0; w < workerCount; w++ {
fmt.Printf("created worker %d\n",w)
go worker(w, job, sig)
}
for {
select {
case s := <-job:
fmt.Printf("Did something here to handle %s\n", s)
case i := <-sig:
workerCount--
if workerCount <= 0 {
return
}
case <-time.After(3*time.Second):
fmt.Printf("program deadline reached. Exiting\n")
return
}
}
}
Case 3 & case 4 are okay.
If a channel is single sender multiple receiver, do not close it. Leave it to garbage collector.
If you really need to close it, the channel must be single sender single receiver and only sender closes the channel so re-plan it. Always remember these rules about channels:
- A send to a
nil
channel blocks forever - A receive from a
nil
channel blocks forever - A send to a closed channel panics
- A receive from a closed channel returns the zero value immediately
- When in doubt, leave the channel to garbage collector. It’s not compulsory to always close a channel.
Source from Dave Cheney.
@hollowaykeanho, I’d like to mark this one as solution since I don’t want to make this post too long and I did learn a lot from this discussion. I’ll post more if I get more question. Thanks!
This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.