What's the issue with this code


(Holloway) #21

Related to your What's issue with this goroutine code?, your codes are not reflecting to your objective. The design focus is not 5. Generally speaking, your problem would not be far from the 5 philosophers dining problem example I had given to you.


(Jameswang2015) #22

Ok, let me think out them…I’m new to golang and this type of concurrent/goroutine, may need more time and more tips I think.


(Holloway) #23

From your Step 2:

package main

import (
	"bufio"
	"fmt"
	"net/http"
	"time"
)

func httpCall(url string) {
	client := http.Client{
		Timeout: 5 * time.Second,
	}
	received, err := client.Get(url)
	if err != nil {
		panic(err)
	}
	defer received.Body.Close()

	fmt.Println("status: ", received.StatusCode)
	scanner := bufio.NewScanner(received.Body)
	for i := 0; scanner.Scan() && i < 5; i++ {
		fmt.Println(scanner.Text())
	}
	if err := scanner.Err(); err != nil {
		panic(err)
	}
}
func main() {
	urls := []string{
		"http://gobyexample.com",
		"http://att.com",
		"http://domaintools.com",
		"http://microsoft.com",
		"http://google.com",
	}
	for i := 0; i < len(urls); i++ {
		httpCall(urls[i])
	}
}

It’s clearly visible that httpCall is your goroutine (or worker). You can see clearly now that in order to spin 5 workers, your main would just have to do:

	for i := 0; i < len(urls); i++ {
		go httpCall(urls[i])
	}

In fact, you already be able to spin n numbers of workers based on number of urls without specifying explicitly.


However, we got 2 problems:

  1. Can the worker dump the printout back to main (the one you’re looking for) instead of doing fmt printing on its own? If yes, how.
  2. How does main know how long should it wait for all independent goroutines to complete? Do we need a sync here to wait?

Incorporate these 2 problems into your Plan 1 to formulate Plan 2. Stop the code investigations. At this point, you should deal with the planning first.


(Jameswang2015) #24

Can the worker dump the printout back to main (the one you’re looking for) instead of doing fmt printing on its own? If yes, how.

Yes I think we can use channel to achieve this. to do this, we can add a string type channel as 2nd argument in goroutine and send the scanner.Text() (string) to this channle. at main goroutine, we can receive this string from this channel. and we expect 5 receiving from this channel.

How does main know how long should it wait for all independent goroutines to complete? Do we need a sync here to wait?

continued from last one,main goroutine expects 5 receiving from the channel, so I think I can build a for loop to handle 5 times of receiving from channel, which syncs up with goroutine.

Let me know how you think.
Thanks


(Jameswang2015) #25

accordingly, my plan 2 is updated as below:

  • main:
    • make a string type channel
    • build a urls slice for 5 urls
    • create 5 goroutines for url search, accepting two arguments:
      • 1st argument is url string
      • 2nd argument is string type channle
    • use for loop to receive 5 strings from the channel and print them out to screen - this also syncs up with goroutines
  • goroutine:
    • create http.Client with Timeout: 5 * time.Second
    • use clinet.Get(url) to scrape website as per given url
    • check error, and defer received.Body.Close()
    • read first 5 lines of html body and concatenate them to a string
    • check error
    • send result to channel

(Holloway) #26

Excellent! sharp. We now have a channel contact switching between main and worker. So, remember my keyword:

WHAT
string channel for each dedicated workers

WHY
for workers to asynchronously reply back to main to printout.

WHO
owner: main
sender: worker
closer: ??? (is closing necessary?)

WHEN
creation: in the worker creation for loop. Main must manage the channels separately.

HOW

  1. main as receiver, read output from channel.
  2. worker as sender, send all output to channel.

You can now improve step 2 with channel implementation and reply back.


Is counting the messages guarantees that all workers are done?

Example:
You sent 5 workers out to various shops for buying random amount of bread. When they return, they inform you they are back while passing you some random about of bread per person.

As the main person, which signals is more guaranteed? Counting bread / worker informing you they’re back?

So if we plot it nicely…

WHAT
??? using what signal then determine the tool??

WHY
for main to sync up with all workers by waiting so that main can end the program.

WHO
owner: main
sender: worker

WHEN
creation: at main side.

HOW

  1. main as signal receiver.
  2. worker as signal sender.
  3. We don’t know yet until we decide WHAT to use.

Can you find out which tool is the best candidate for this problem?


(Holloway) #27

@jameswang2015, all right there?


(Jameswang2015) #28

@hollowaykeanho, sorry for the delay - super busy with my daytime development and now let’s continue this issue!

WHO
owner: main
sender: worker
closer: ??? (is closing necessary?)

in my step2 plan the main rely on receiving exactly same number of channel as workers, so the close(chan) not required in my case.

here is the code per my plan 2.

package main

import (
	"bufio"
	"fmt"
	"net/http"
	"time"
)

func httpCall(url string, result chan string) {
	client := http.Client{
		Timeout: 5 * time.Second,
	}
	received, err := client.Get(url)
	if err != nil {
		result <- err.Error()
		return
	}
	defer received.Body.Close()

	fmt.Println("status: ", received.Status)
	scanner := bufio.NewScanner(received.Body)
	var res string
	for i := 0; scanner.Scan() && i < 5; i++ {
		res += scanner.Text()
	}
	if err := scanner.Err(); err != nil {
		result <- err.Error()
		return
	}
	result <- url + "\n" + res
}
func main() {
	result := make(chan string)
	urls := []string{
		"http://gobyexample.com",
		"http://att.com",
		"http://domaintools.com",
		"http://microsoft.com",
		"http://google.com",
	}
	for i := 0; i < len(urls); i++ {
		go httpCall(urls[i], result)
	}
	for j := 0; j < len(urls); j++ {
		fmt.Printf(<-result, "\n")
	}
}

I’ll need to think about the better tool or more tips to improve it.


(Holloway) #29

There are 3 known tools at your disposal. I’m sure by now you know each of them in detail so I skip the explanation.

  1. channel
  2. mutex.RWLock
  3. sync.WaitGroup

Take your time and choose your signal based on the available tools. No.1 and No.3 are interchangeable in my Philosopher’s example.


(Jameswang2015) #30

I think WaitGroup could be the best fit in this case. In a more general scenario, let’s say we have m workers, n jobs, then main can use wg.Add(m) to explicitly wait for signals from all workers when all of them get n jobs done.

  • main:
    • create sync.WaitGroup wg.Add(m)
    • create channel1 for sending jobs from main to goroutine
    • (optional) if main needs to receive info from goroutine, then create channel2 for receiving info from workers to main
    • create m workers
      • 1st argument: channel1 for sending jobs to workers
      • (optional) 2nd argument: channle2 for receiving info from workers
      • 3rd argument: &wg(avoid global variable)
    • assign jobs through channel1
      • require close(channel1) to send signal to workers to stop
    • receive info through channel2
      • no need for closing channel2 - main receive info from workers as long as workers have info sent to channel2, use WaitGroup for terminate whole program
    • create wg.Wait()
  • workers:
    • keep receiving jobs through channel1
    • do the job
    • send result back to main via channel2
    • if receive close(channel1), then stop work and notify main

in this case main don’t expect receive all results from each worker for each jobs, as long as there is result sent into channel2 main can handle it. and main use wg to sync up with workers to terminate entire program

By the way, normally how many workers we can create to get the best use of the resources for optimal performance?


(Holloway) #31

Perfect! Now upgrade your code again. You should get what you want by now. Congratulations! You just got a deadlock-free and race-free concurrency. The development is not that complicated isn’t it? :grinning:

No straight answer. You need to run benchmark to test it. Concurrency varies between operating system & hardware.

This is an optional next step: how to control no. of spawn so that we don’t overload the cpu. Obviously, you can do another loop in main that sync them up. That’s another story.


As a wrap up, to plan for concurrency:

  1. Run it in a single process (main)
  2. Abstract the key functions into dedicated process (httpCalls). Isolate them.
  3. Refactor them to the finest.
  4. Identify the workers and their contact switching.
  5. Plan the controls between goroutines.
  6. Optimize it.

(Holloway) #32

About your main printout, you can consider using for...select loop. It cleaner and sensible. E.g.:

  1. https://gobyexample.com/select

To integrate sync.WaitGroup into that for...select loop, you can tomaz way of doing things:


Or use 2 channels instead of sync.WaitGroup, like the one I implemented in Philosophers with fullness and c1 channel. In your case, it looks something like:

for {
    select {
    case message := <-msg:
        ... // handle message
    case <-ended:
        ... // handle completed count
        done++ // increase done count
        if done == headcount {
            return // once everyone is done. Exit.
        }
    }
}

(Jameswang2015) #33

@hollowaykeanho Thanks
I’m rethinking this with four scenarios and let me if this makes any sense to you.

  • case1: one way communication (from main to goroutine only).

    • create m workers
    • assign n jobs
    • main won’t terminate until all goroutines get all jobs done
    • I’d like to use sync.WaitGroup{} to sync up between main and goroutines
  • case2: one way communication but for any job that exceeds some preset time it times out and get ignored

    • instead of wg, I’d like to use a dedicated channel for goroutines to notify main whenever it get the job done. on main I’d like to use select to choose between this channel and a preset timer to achieve this time out function
    • use for loop with same number of iteration as number of jobs to sync up between main and goroutine
  • case3: two way communication (main send jobs to goroutines and goroutines send results back to main)

    • create n jobs
      • make these buffered channels with size of n - m to avoid dead lock
    • make another channel for goroutine to send result back to main
    • create m workers
    • assign n jobs
    • close(job) to notify goroutines
    • use for loop with same number of iteration as number of jobs to receive result of each job from goroutines
    • at goroutine, use for ... range... to keep receiving jobs, do jobs, send result back to main, until get notified that job close
    • bascally, use result channel to sync up main and goroutines
  • case4: two way communication with time limit that main time out any result from goroutines if that job isn’t done on time

    • use for loop with same number of iteration as number of jobs
    • use select to choose between result from goroutine and a timer
    • basically, use the result channel to sync up between main and goroutines

below are codes for each cases.

//case1:
// workers: m
// jobs: n
// no response is required from goroutine back to main
// main won't terminate until all goroutines finish jobs
// use sync.WaitGroup
package main

import (
	"fmt"
	"strconv"
	"sync"
	"time"
)

const numberOfWorkers  = 3
const numberOfJobs = 5

func worker(id int, job chan string, wg *sync.WaitGroup) {
	defer wg.Done()
	for receivedJob := range job {
		fmt.Printf("worker %d received %s. working\n", id, receivedJob)
		time.Sleep(2*time.Second)
		fmt.Printf("worker %d get %s done.\n", id, receivedJob)
	}
}
func main() {
	wg := sync.WaitGroup{}
	wg.Add(numberOfWorkers)
	job := make(chan string)

	for w := 0; w < numberOfWorkers; w++ {
		fmt.Printf("created worker %d\n",w)
		go worker(w, job, &wg)
	}

	for j := 0; j < numberOfJobs; j++ {
		fmt.Printf("assigned job %d\n", j)
		job <- "job " + strconv.Itoa(j)
	}
	close(job)

	wg.Wait()
}
//case2
// workers: m
// jobs: n
// no response is required from goroutine back to main
// main won't terminate until all goroutines finish jobs
// except the processing of each job by goroutine exceed preset timer
// ues channel + select + time.After to control this

package main

import (
	"fmt"
	"strconv"
	"time"
)

const numberOfWorkers  = 3
const numberOfJobs = 5

func worker(id int, job chan string, done chan bool) {
	for receivedJob := range job {
		fmt.Printf("worker %d received %s. working\n", id, receivedJob)
		time.Sleep(1*time.Second)
		fmt.Printf("worker %d get %s done.\n", id, receivedJob)
		done <-true
	}
}
func main() {
	job := make(chan string, 2)
	done := make(chan bool)
	for w := 0; w < numberOfWorkers; w++ {
		fmt.Printf("created worker %d\n",w)
		go worker(w, job, done)
	}

	for j := 0; j < numberOfJobs; j++ {
		fmt.Printf("assigned job %d\n", j)
		job <- "job " + strconv.Itoa(j)
	}
	close(job)
	for r := 0; r < numberOfJobs; r++ {
		select {
		case <-done:
			fmt.Println("one job done on time")
		case <-time.After(2*time.Second):
			fmt.Println("time out")
		}
	}
}
//case3
// workers: m
// jobs: n
// require goroutine send result of each job back to main
// use 2nd channel to send result from goroutine back to main
// use for loop to iterate same number of loops as number of jobs
// to sync up with goroutine
package main

import (
	"fmt"
	"strconv"
	"time"
)

const numberOfWorkers  = 3
const numberOfJobs = 5

func worker(id int, job chan string, result chan string) {
	for receivedJob := range job {
		fmt.Printf("worker %d received %s. working\n", id, receivedJob)
		time.Sleep(1*time.Second)
		result <-receivedJob + " done by worker " + strconv.Itoa(id)
		fmt.Printf("worker %d get %s done.\n", id, receivedJob)
	}
}
func main() {
	job := make(chan string, numberOfJobs)
	result := make(chan string)

	for w := 0; w < numberOfWorkers; w++ {
		fmt.Printf("created worker %d\n",w)
		go worker(w, job, result)
	}

	for j := 0; j < numberOfJobs; j++ {
		fmt.Printf("assigned job %d\n", j)
		job <- "job " + strconv.Itoa(j)
	}
	close(job)

	for r := 0; r < numberOfJobs; r++ {
		received := <-result
		fmt.Printf("received result %d : %s\n", r, received)
	}
}
//case4:
// workers: m
// jobs: n
// require goroutine send result of each job back to main
// if main can't receive job in preset time then main get it
// times out and ignore it
// use 2nd channel to send result from goroutine back to main
// use for loop + select + time.After() to sync up with goroutine
package main

import (
	"fmt"
	"strconv"
	"time"
)

const numberOfWorkers  = 3
const numberOfJobs = 5

func worker(id int, job chan string, result chan string) {
	for receivedJob := range job {
		fmt.Printf("worker %d received %s. working\n", id, receivedJob)
		time.Sleep(2*time.Second)
		result <-receivedJob + " done by worker " + strconv.Itoa(id)
		fmt.Printf("worker %d get %s done.\n", id, receivedJob)
	}
}
func main() {
	job := make(chan string, 2)
	result := make(chan string)

	for w := 0; w < numberOfWorkers; w++ {
		fmt.Printf("created worker %d\n",w)
		go worker(w, job, result)
	}

	for j := 0; j < numberOfJobs; j++ {
		fmt.Printf("assigned job %d\n", j)
		job <- "job " + strconv.Itoa(j)
	}
	close(job)

	for r := 0; r < numberOfJobs; r++ {
		select {
		case received := <-result:
			fmt.Printf("received result %d : %s\n", r, received)
		case <-time.After(3*time.Second):
			fmt.Printf("time out\n")
		}
	}
}

Thanks


(Holloway) #34

Case 1, 2, 3 makes sense. Case 4 makes no sense. I will explain in terms of control and data synchronization between threads.

This is commonly done when the workers are 100% guaranteed returning and there is no data synchronization needed.

If you notice, the control is very passive. It’s very easy to lose control if the worker is not 100% guaranteed (as in main will wait forever).

I consider case 2 is the same as case 1. Not much of difference excepting setting deadline termination.

The one recommended in your final plan is this case. It is slightly more complicated than case 1 in exchange for more direct and active control at main.

Normally, we have main acts like an active coordinators (like a boss of the company) instead of working some task. It builds all the necessary channels for all goroutines to talk to one another. In another word, each goroutines has its own dedicated channel for receiving incoming messages. E.g.:

  1. main creates its own receiving channel m1 for receiving completed signal.
  2. main delegated encoder goroutine with r1 channel, passing m1 channel to it.
  3. main delegated work to worker goroutine with w1 channel, passing r1 and m1 channels to it.
  4. main wait for all workers completion.
  5. worker goroutine directly passes output to r1 channel. Once done, it notifies main that it’s done via m1. When main receives the signal, it drops w1 from its future communications since it’s done.
  6. Once main confirms termination period (either successful working / timeout), main will notify all worker channels including r1 for graceful closing (if needed). Otherwise, it terminates the program.

Why I consider case 4 does not makes any sense mainly because you have 2-ways communications established. main is an active coordinator so it can terminate the program at will. Adding a timeout would be an overkill.

I won’t consider worker’s having deadline as “timeout” since it is its expected task. Hence, it’s outside of this context of of discussion.


Hands are tight. I will reply your code reviews in a separate post later.


(Holloway) #35

For case 2, it should not defer too far from case 1. Here’s one simple replacement (not tested).

package main

import (
	"fmt"
	"strconv"
	"time"
)

const (
	numberOfWorkers  = 3
	numberOfJobs = 5
)

func worker(id int, job chan string, m1 chan int) {
	defer m1 <- id
	for receivedJob := range job {
		fmt.Printf("worker %d received %s. working\n", id, receivedJob)
		time.Sleep(2*time.Second)
		fmt.Printf("worker %d get %s done.\n", id, receivedJob)
	}
}

func main() {
	sig := make(chan int)
	job := make(chan string)
	workerCount := numberOfWorkers

	for w := 0; w < workerCount; w++ {
		fmt.Printf("created worker %d\n",w)
		go worker(w, job, sig)
	}

	for {
		select {
		case s := <-job:
			fmt.Printf("Did something here to handle %s\n", s)
		case i := <-sig:
			workerCount--

			if workerCount <= 0 {
				return
			}
		case <-time.After(3*time.Second):
			fmt.Printf("program deadline reached. Exiting\n")

			return
		}
	}
}

Case 3 & case 4 are okay.

If a channel is single sender multiple receiver, do not close it. Leave it to garbage collector.

If you really need to close it, the channel must be single sender single receiver and only sender closes the channel so re-plan it. Always remember these rules about channels:

  • A send to a nil channel blocks forever
  • A receive from a nil channel blocks forever
  • A send to a closed channel panics
  • A receive from a closed channel returns the zero value immediately
  • When in doubt, leave the channel to garbage collector. It’s not compulsory to always close a channel.

Source from Dave Cheney.