[solved] Concurrency - API request then update database

I am getting emails from a database and then checking their spam score. There are duplicate emails in the database. I can group the results by the email hash to get a list of unique emails. This way I don’t check the spam score of the same email more than once. After I have the spam score for an email, I want to update all the rows in the database that have that hash.

So, to do this, I thought I could have a few workers querying the API, another goroutine to query the database to get all the records I need. Then have a function to save the records as the API returns results. It doesn’t matter what order I do this, so I thought this would be a good opportunity to learn about concurrency.

I’ve tried several variations of the code below. The output is also below. The program never quits. I think I’m having a hard time figuring out where to close which channel.

Thank you :slight_smile:

package main

import (
	"log"
	"strconv"

	"github.com/fatih/color"
	"github.com/golevi/comint/models"
	"github.com/golevi/spamcheck"
)

func worker(in, out chan models.Email) {
	for email := range in {
		color.Yellow("Working %s\n", email.Hash)
		scr := spamcheck.NewRequest(email.Mail)
		resp, err := scr.CheckScore()
		if err != nil {
			log.Println(err)
		}
		score, err := strconv.ParseFloat(resp.Score, 64)
		email.SpamScore = score
		out <- email
		// log.Println(email.Hash, email.SpamScore)
	}
	color.Yellow("Worker quitting")
}

func sendWorkIn(in chan models.Email, limit int) {
	var emails []models.Email
	models.GetDB().Select("hash").Group("hash").Where("spam_score = 0").Limit(limit).Find(&emails)
	color.Red("Found %d hashes", len(emails))

	for _, email := range emails {
		color.Cyan("Sending %s\n", email.Hash)
		in <- email
	}
	close(in)
	color.Red("Finished sending emails")
}

func receiveResults(out chan models.Email, limit int) {
	color.Red("Receiving results...")
	tx := models.GetDB().Begin()
	for i := 0; i < limit; i++ {
		email := <-out
		tx.Model(models.Email{}).Where("hash = ?", email.Hash).Updates(models.Email{SpamScore: email.SpamScore})
		color.Green("Updated %s\n", email.Hash)
	}
	tx.Commit()
	color.Red("Finished updating database")
}

func main() {
	in, out := make(chan models.Email), make(chan models.Email)
	color.Red("Starting...")

	workers := 10
	for i := 0; i < workers; i++ {
		go worker(in, out)
	}
	limit := 100
	go sendWorkIn(in, limit)
	// wait for all the work to get done
	receiveResults(out, limit)
}

Instead of:

for {
    select {
    case <-out:
        email := <- out
        // ...

Howabout just:

for email := range out