Consuming with goroutines

currently i have a scenario where i have huge file (for example im going to say 500k lines of text) and the idea is to use worker (threads) to process them by 100 each thread. after running my code, i still wonder why the goroutines consume the same line more than once? im guessing it’s racing to get the job done.

here’s my code

package main

import (
     "log"
     "bufio"
     "fmt"
     "encoding/csv"
     "encoding/json"
     "io"
     "os"
     "sync"
)

type IMDBDataModel struct {
     Color                  string `json:"color"`
     DirectorName           string `json:"director_name"`
     NumCriticForReviews    string `json:"num_critic_for_reviews"`
     Duration               string `json:"duration"`
     DirectorFacebookLikes  string `json:"director_facebook_likes"`
     Actor3FacebookLikes    string `json:"actor_3_facebook_likes"`
     Actor2Name             string `json:"actor_2_name"`
     Actor1FacebookLikes    string `json:"actor_1_facebook_likes"`
     Gross                  string `json:"gross"`
     Genre                  string `json:"genres"`
     Actor1Name             string `json:"actor_1_name"`
     MovieTitle             string `json:"movie_title"`
     NumVotedUser           string `json:"num_voted_users"`
     CastTotalFacebookLikes string `json:"cast_total_facebook_likes"`
     Actor3Name             string `json:"actor_3_name"`
     FaceNumberInPoster     string `json:"facenumber_in_poster"`
     PlotKeywords           string `json:"plot_keywords"`
     MovieIMDBLink          string `json:"movie_imdb_link"`
     NumUserForReviews      string `json:"num_user_for_reviews"`
     Language               string `json:"language"`
     Country                string `json:"country"`
     ContentRating          string `json:"content_rating"`
     Budget                 string `json:"budget"`
     TitleYear              string `json:"title_year"`
     Actor2FacebookLikes    string `json:"actor_2_facebook_likes"`
     IMDBScore              string `json:"imdb_score"`
     AspectRatio            string `json:"aspect_ratio"`
     MovieFacebookLikes     string `json:"movie_facebook_likes"`
}

var iterated int64
var out []*IMDBDataModel

func populateString(input []IMDBDataModel, out []*IMDBDataModel, wg *sync.WaitGroup) {
     for _ , data := range input {          
          out = append(out, &data)
     }     
     wg.Done()
}

func consumeData(input <-chan *IMDBDataModel, wg *sync.WaitGroup){
     defer wg.Done()
     for data := range input {          
          iterated++          
          fmt.Printf("%d : %s\n", iterated, data.MovieTitle)
          out = append(out, data)
     }
     fmt.Println("output size : ", len(out))

}

func processCSV(path string) (imdbList []IMDBDataModel){
     csvFile, _ := os.Open(path)
     reader := csv.NewReader(bufio.NewReader(csvFile))

     for {          
          line, error := reader.Read()
          if error == io.EOF {
               break
          } else if error != nil {
               log.Fatal(error)
          }
          imdbList = append(imdbList, 
               IMDBDataModel{
                    Color: line[0],
                    DirectorName: line[1],
                    NumCriticForReviews : line[2],
                    Duration: line[3],
                    DirectorFacebookLikes: line[4],
                    Actor3FacebookLikes: line[5],
                    Actor2Name: line[6],
                    Actor1FacebookLikes: line[7],
                    Gross: line[8],
                    Genre: line[9],
                    Actor1Name: line[10],
                    MovieTitle: line[11],
                    NumVotedUser: line[12],
                    CastTotalFacebookLikes: line[13],
                    Actor3Name: line[14],
                    FaceNumberInPoster: line[15],
                    PlotKeywords: line[16],
                    MovieIMDBLink: line[17],
                    NumUserForReviews: line[18],
                    Language: line[19],
                    Country: line[20],
                    ContentRating: line[21],
                    Budget: line[22],
                    TitleYear: line[23],
                    Actor2FacebookLikes: line[24],
                    IMDBScore: line[25],
                    AspectRatio: line[26],
                    MovieFacebookLikes: line[27],
               },
          )          
     }
     imdbJson, err := json.Marshal(imdbList)
     if err != nil {
          log.Println(imdbJson)
     }

     return 
}

func main() {     
     imdbList := processCSV("movie_metadata.csv")     
     imdbChannel  := make(chan *IMDBDataModel, 100) // buffer

     var wg sync.WaitGroup
     for i := 0; i < 5;i++ {
          wg.Add(1)
          go consumeData(imdbChannel,&wg)     
     }

     for _ ,task := range imdbList {          
          imdbChannel <- &task               
     }

     close(imdbChannel)     
     wg.Wait()

     // for _, item := range out {
     //      fmt.Println(item.MovieTitle)
     // }

     fmt.Println("Total Channel :", len(imdbChannel)) 
     fmt.Println("Total IMDB :", len(imdbList))
     fmt.Println("Total Data: ", len(out))
     fmt.Println("Iterated : ", iterated)
     fmt.Println("Goroutines finished..")


}

after few suggestions on adding mutex and another channel, this is the modified consume function

func consumeData(input <-chan *IMDBDataModel, output chan *IMDBDataModel, wg *sync.WaitGroup) {
    defer wg.Done()
    for data := range input {
        iterated++
        // outLock.Lock()
        // out = append(out, data)
        // outLock.Unlock()
        output <- data
    }
}

however still consuming the same line (race occured) more than once.

....
My Date with Drew 
My Date with Drew 
My Date with Drew 
My Date with Drew 
My Date with Drew 
Total Channel : 0
Total IMDB : 5044
Total Data:  4944
Iterated :  5000
Goroutines finished..
1 Like

I think the problem is in this loop:

for _ ,task := range imdbList {          
    imdbChannel <- &task               
}

The address of task is the same for every iteration of the loop. Its contents change. The solution is imdbChannel <- task.

My guess is you were trying to save memory copies of all the IMDBDataModels. One way to do this is for processCSV to return []*IMDBDataModel instead of []IMDBDataModel.

4 Likes

Dude thanks a lot! you nailed it. That really was the problem.

2 Likes

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.