Return job results from worker

Hi,

The example below runs 3 goroutine workers that handle incoming jobs. This works fine for now. However, where I am stuck is, returning results per jobs. How do I accomplish this? Ideally, as soon as a job X is done, I should get the result back. https://play.golang.org/p/o4211X0k7bp

Thanks

package main

import (
	"fmt"
	"time"
)

func main() {
	w := NewWorker()
	w.Start()

	for i := 0; i < 19; i++ {
		j := NewJob(i)
		w.Add(j)
		
		time.Sleep(time.Second)
	}

	select {}
}

// ----------------------------------------------------------------------------------------

type Job struct {
	id int
}

func NewJob(id int) *Job {
	return &Job{
		id: id,
	}
}

// ----------------------------------------------------------------------------------------

type Worker struct {
	total   int
	channel chan *Job
}

func NewWorker() *Worker {
	return &Worker{
		total:   3,
		channel: make(chan *Job, 100),
	}
}

func (w *Worker) Start() {
	for i := 1; i <= w.total; i++ {
		go w.run(i)
	}
}

func (w *Worker) Add(job *Job) {
	w.channel <- job
}

func (w *Worker) run(id int) {
	fmt.Println("worker", id, "started...")

	for {
		select {
		case job := <-w.channel:
			fmt.Println(id, "processing job", job.id)

			// This is the result
			// fmt.Sprintf("%d done @ %s", job.id, time.Now().UTC().String())
		default:
			time.Sleep(1 * time.Second)
			fmt.Println("sleeping")
		}
	}
}

@GoingToGo What do you mean by “get the result back?”

// This is the result
// fmt.Sprintf(“%d done @ %s”, job.id, time.Now().UTC().String())

Assume that job X has been processed and I want to return fmt.Sprintf("%d done @ %s", job.id, time.Now().UTC().String()) as result FROM THE WORKER.

There is channel exactly for this purpose.

Go by Example - Worker Pools

You could do it like this: https://play.golang.org/p/K26RKFauS4D

@skillian

I’ve literally completed about 5 minutes before your answer and looks 90% same as yours. However, the remaining 10% looks much cleaner than mine so I shall benefit from your example.

Thank you very much for the time spent though. Much appreciated.

@skillian

Two things.

  1. Your workers exit when there is no job but they should always be up and running so the application should never exit.
  2. How do we know if the result channel is empty in main()? I need to print something in such case.

This is my version without your changes so far: https://play.golang.org/p/AiqLLq1CLPd
Only need to address the 2nd point above but don’t know how.

@GoingToGo If the workers need to stay alive for the lifetime of the program, what does it mean for the result channel to be empty? You can do this:

// Print results.
loop:
for {
	select {
	case res, ok := <-w.Result():
		if !ok {
			// channel closed
			break loop
		}
		fmt.Println(res)
	default:
		fmt.Println("channel empty (for now)")
	}
}

But this will print "channel empty (for now)" over and over again until you get a result. You could add a time.Sleep(1 * time.Second), but then if 10 results complete very quickly in succession, it’ll take >1 second to handle each result.

Can you elaborate more on what you’re trying to do? What’s the requirement behind needing to print something when there are no more results? Do you just want to see that the program is still alive, like a heartbeat?

This is an experimental project to understand how channels and goroutines are used to build long running applications. Based on my use case, this is what I am doing.

This is a CLI app that listens for incoming requests and processes them before returning the results for each request. If a new request comes in, it prints “in”, once completed “out”, while waiting “waiting” (job channel is empty) and “empty” (result channel is empty) on the terminal so four scenarios.

I know some parts don’t make sense but at least it is more like for learning practises.

I had the same question and wanted to learn more about go concurrency and channels. This was also the reason I have built my own implementation.
Maybe my code might help you:

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.