currently i have a scenario where i have huge file (for example im going to say 500k lines of text) and the idea is to use worker (threads) to process them by 100 each thread. after running my code, i still wonder why the goroutines consume the same line more than once? im guessing it’s racing to get the job done.
here’s my code
package main
import (
"log"
"bufio"
"fmt"
"encoding/csv"
"encoding/json"
"io"
"os"
"sync"
)
type IMDBDataModel struct {
Color string `json:"color"`
DirectorName string `json:"director_name"`
NumCriticForReviews string `json:"num_critic_for_reviews"`
Duration string `json:"duration"`
DirectorFacebookLikes string `json:"director_facebook_likes"`
Actor3FacebookLikes string `json:"actor_3_facebook_likes"`
Actor2Name string `json:"actor_2_name"`
Actor1FacebookLikes string `json:"actor_1_facebook_likes"`
Gross string `json:"gross"`
Genre string `json:"genres"`
Actor1Name string `json:"actor_1_name"`
MovieTitle string `json:"movie_title"`
NumVotedUser string `json:"num_voted_users"`
CastTotalFacebookLikes string `json:"cast_total_facebook_likes"`
Actor3Name string `json:"actor_3_name"`
FaceNumberInPoster string `json:"facenumber_in_poster"`
PlotKeywords string `json:"plot_keywords"`
MovieIMDBLink string `json:"movie_imdb_link"`
NumUserForReviews string `json:"num_user_for_reviews"`
Language string `json:"language"`
Country string `json:"country"`
ContentRating string `json:"content_rating"`
Budget string `json:"budget"`
TitleYear string `json:"title_year"`
Actor2FacebookLikes string `json:"actor_2_facebook_likes"`
IMDBScore string `json:"imdb_score"`
AspectRatio string `json:"aspect_ratio"`
MovieFacebookLikes string `json:"movie_facebook_likes"`
}
var iterated int64
var out []*IMDBDataModel
func populateString(input []IMDBDataModel, out []*IMDBDataModel, wg *sync.WaitGroup) {
for _ , data := range input {
out = append(out, &data)
}
wg.Done()
}
func consumeData(input <-chan *IMDBDataModel, wg *sync.WaitGroup){
defer wg.Done()
for data := range input {
iterated++
fmt.Printf("%d : %s\n", iterated, data.MovieTitle)
out = append(out, data)
}
fmt.Println("output size : ", len(out))
}
func processCSV(path string) (imdbList []IMDBDataModel){
csvFile, _ := os.Open(path)
reader := csv.NewReader(bufio.NewReader(csvFile))
for {
line, error := reader.Read()
if error == io.EOF {
break
} else if error != nil {
log.Fatal(error)
}
imdbList = append(imdbList,
IMDBDataModel{
Color: line[0],
DirectorName: line[1],
NumCriticForReviews : line[2],
Duration: line[3],
DirectorFacebookLikes: line[4],
Actor3FacebookLikes: line[5],
Actor2Name: line[6],
Actor1FacebookLikes: line[7],
Gross: line[8],
Genre: line[9],
Actor1Name: line[10],
MovieTitle: line[11],
NumVotedUser: line[12],
CastTotalFacebookLikes: line[13],
Actor3Name: line[14],
FaceNumberInPoster: line[15],
PlotKeywords: line[16],
MovieIMDBLink: line[17],
NumUserForReviews: line[18],
Language: line[19],
Country: line[20],
ContentRating: line[21],
Budget: line[22],
TitleYear: line[23],
Actor2FacebookLikes: line[24],
IMDBScore: line[25],
AspectRatio: line[26],
MovieFacebookLikes: line[27],
},
)
}
imdbJson, err := json.Marshal(imdbList)
if err != nil {
log.Println(imdbJson)
}
return
}
func main() {
imdbList := processCSV("movie_metadata.csv")
imdbChannel := make(chan *IMDBDataModel, 100) // buffer
var wg sync.WaitGroup
for i := 0; i < 5;i++ {
wg.Add(1)
go consumeData(imdbChannel,&wg)
}
for _ ,task := range imdbList {
imdbChannel <- &task
}
close(imdbChannel)
wg.Wait()
// for _, item := range out {
// fmt.Println(item.MovieTitle)
// }
fmt.Println("Total Channel :", len(imdbChannel))
fmt.Println("Total IMDB :", len(imdbList))
fmt.Println("Total Data: ", len(out))
fmt.Println("Iterated : ", iterated)
fmt.Println("Goroutines finished..")
}
after few suggestions on adding mutex and another channel, this is the modified consume function
func consumeData(input <-chan *IMDBDataModel, output chan *IMDBDataModel, wg *sync.WaitGroup) {
defer wg.Done()
for data := range input {
iterated++
// outLock.Lock()
// out = append(out, data)
// outLock.Unlock()
output <- data
}
}
however still consuming the same line (race occured) more than once.
....
My Date with Drew
My Date with Drew
My Date with Drew
My Date with Drew
My Date with Drew
Total Channel : 0
Total IMDB : 5044
Total Data: 4944
Iterated : 5000
Goroutines finished..