Hi.
I am trying to upload documents from four files to couchbase. Each file has 0.5 Million distinct documents. (File_1, File_2… etc). When I upload seperately, entire data i.e. 2 million documents are pushed. but when I use goroutines (with below code), some data is missed.
The output of the program is:
Starting Insertion of File 3…
Starting Insertion of File 1…
Starting Insertion of File 4…
Starting Insertion of File 2…
Insertion of File 3 Completed…
Insertion of File 1 Completed…
Insertion of File 4 Completed…
Insertion of File 2 Completed…
The code is as follows (error checking is ignored for simplicity, no error detected in the actual program):
package main
import (
"bufio"
"encoding/csv"
"fmt"
"io"
"log"
"strconv"
"os"
"sync"
"runtime"
"gopkg.in/couchbase/gocb.v1"
"time"
)
var (
bucket *gocb.Bucket
wg sync.WaitGroup
)
func main() {
Start := time.Now()
runtime.GOMAXPROCS(runtime.NumCPU())
logfile, _ := os.OpenFile("E:\\GO\\Utility\\Logs\\log.txt", os.O_APPEND, 0666)
cluster, _:= gocb.Connect("couchbase://localhost") //..........Establish Couchbase Connection
bucket, _= cluster.OpenBucket("example", "********")
Path := "E:\\GO\\Utility\\DATA\\File_"
for i := 1; i <= 4; i++{
wg.Add(1)
go InsertDataFromFile(Path+strconv.Itoa(i)+".txt", i)
}
wg.Wait()
_= bucket.Close() //.............. Close Couchbase Connection
Elapsed := time.Since(Start)
_, logError := fmt.Fprintf(logfile, "\n Execution Took:", Elapsed)
if logError != nil {
fmt.Println(logError)
}
}
/*-- Main function Ends Here --*/
func InsertDataFromFile(Path string, i int) (){
var (
ID string
JSONData string
items []gocb.BulkOp
)
csvFile, _ := os.Open(Path) //...............Open flat file containing data
reader := csv.NewReader(bufio.NewReader(csvFile))
reader.Comma = '$'
reader.LazyQuotes = true
counter := 1
fmt.Println("Starting Insertion of File "+ strconv.Itoa(i) + "...")
for {
line, error := reader.Read()
if error == io.EOF {
break
} else if error != nil {
log.Fatal(error)
}
ID = line[0] //...............Parse data and append it into items[] array
JSONData = line[1]
items = append(items, &gocb.UpsertOp{Key: ID, Value: JSONData})
if counter % 2000 == 0 {
BulkInsert(&items) //................Bulk Insert Next 2000 records into couchbase
items = nil
}
counter = counter + 1
}
BulkInsert(&items) //................Insert remaining documents if there are any
items = nil
fmt.Println("Insertion of File "+ strconv.Itoa(i) + " Completed...")
wg.Done()
}
func BulkInsert(item *[]gocb.BulkOp) (){
err := bucket.Do(*item)
if err != nil {
fmt.Println("ERROR PERFORMING BULK INSERT:", err)
}
}
Can you please suggest me what might be going wrong here?