I am getting emails from a database and then checking their spam score. There are duplicate emails in the database. I can group the results by the email hash to get a list of unique emails. This way I don’t check the spam score of the same email more than once. After I have the spam score for an email, I want to update all the rows in the database that have that hash.
So, to do this, I thought I could have a few workers querying the API, another goroutine to query the database to get all the records I need. Then have a function to save the records as the API returns results. It doesn’t matter what order I do this, so I thought this would be a good opportunity to learn about concurrency.
I’ve tried several variations of the code below. The output is also below. The program never quits. I think I’m having a hard time figuring out where to close which channel.
Thank you
package main
import (
"log"
"strconv"
"github.com/fatih/color"
"github.com/golevi/comint/models"
"github.com/golevi/spamcheck"
)
func worker(in, out chan models.Email) {
for email := range in {
color.Yellow("Working %s\n", email.Hash)
scr := spamcheck.NewRequest(email.Mail)
resp, err := scr.CheckScore()
if err != nil {
log.Println(err)
}
score, err := strconv.ParseFloat(resp.Score, 64)
email.SpamScore = score
out <- email
// log.Println(email.Hash, email.SpamScore)
}
color.Yellow("Worker quitting")
}
func sendWorkIn(in chan models.Email, limit int) {
var emails []models.Email
models.GetDB().Select("hash").Group("hash").Where("spam_score = 0").Limit(limit).Find(&emails)
color.Red("Found %d hashes", len(emails))
for _, email := range emails {
color.Cyan("Sending %s\n", email.Hash)
in <- email
}
close(in)
color.Red("Finished sending emails")
}
func receiveResults(out chan models.Email, limit int) {
color.Red("Receiving results...")
tx := models.GetDB().Begin()
for i := 0; i < limit; i++ {
email := <-out
tx.Model(models.Email{}).Where("hash = ?", email.Hash).Updates(models.Email{SpamScore: email.SpamScore})
color.Green("Updated %s\n", email.Hash)
}
tx.Commit()
color.Red("Finished updating database")
}
func main() {
in, out := make(chan models.Email), make(chan models.Email)
color.Red("Starting...")
workers := 10
for i := 0; i < workers; i++ {
go worker(in, out)
}
limit := 100
go sendWorkIn(in, limit)
// wait for all the work to get done
receiveResults(out, limit)
}