Very low performance of GO code written for couchbase to insert 5lakh records

Can anyone please help me tune performance of the couchbase database by making my code work fast and efficiently . In c++ it takes only 19sec to insert 5lakh records to same db , but my GO code takes around 19minutes to complete the same .
below are the configuration values
“NumofDoc”:100000,
“ThreadCount”:5,

    package main
    
    import (
        "fmt"
        "gopkg.in/couchbase/gocb.v1"
        "strconv"
        "time"
        //"io/ioutil"
        //"strings"
        "github.com/tkanos/gonfig"
        //"os"
        //"encoding/json"
        "math/rand"
    //    "math"
       )   
        
    var  (    
        bucket *gocb.Bucket
         )   
    
    type Insert_doc struct {
        Thread_id int 
        KTAB, SyncBuffer string
    } 
    
    type Configuration struct {
        NumofDoc  int
        Username   string
        Password   string
        BucketName string
        ThreadCount int
        Port int
        OP_TYPE int
    }
    
    func main() {
         configuration := Configuration{}
         err := gonfig.GetConf("Couchbase_config.json", &configuration)
         fmt.Println("Config File name Passed :  Couchbase_config.json")
         fmt.Println("ThreadCount : ",configuration.ThreadCount)
         fmt.Println("Number of Requests per thread : ",configuration.NumofDoc)
    
    //	 if you have more than 5 core, make it 5 to use 5 os thread runtime.GOMAXPROCS(5)
        cluster, err := gocb.Connect("couchbase://") //Connects to the cluster
        if err != nil {
            fmt.Println(err.Error())
            return
        }
        cluster.Authenticate(gocb.PasswordAuthenticator{
            Username: configuration.Username,
            Password: configuration.Password,
        })
        fmt.Println("Cluster:%v",cluster)
            bucket, err = cluster.OpenBucket(configuration.BucketName, "") //Connects to the bucket
            if err != nil {
                fmt.Println(err.Error())
                return
            }
        fmt.Println("Bucket:%v",bucket)
        var jobs []chan int
        results := make(chan interface{}, configuration.NumofDoc)
        start := time.Now()      // current local time
        starttime := start.Unix()      // number of seconds since January 1, 1970 UTC
        for i := 0; i < configuration.ThreadCount; i++ {
    		workerJobs := make(chan int, configuration.NumofDoc) // create jobs per worker/thread
            jobs = append(jobs, workerJobs)
            go worker(i, workerJobs, results,configuration.OP_TYPE)
    	}
    	for _, jobs := range jobs {
    		for j := 1; j <= configuration.NumofDoc; j++ {
    			jobs <- j
    		}
    		close(jobs)
    	}
        // wait all workers to complete their jobs
    	for a := 1; a <= configuration.NumofDoc*len(jobs); a++ {     
    		//wait all the workers to return results
    		<-results
    	}    
    
        end := time.Now()      // current local time
        Endtime := end.Unix()      // number of seconds since January 1, 1970 UTC
        fmt.Printf("Script Starting time : %v \n",starttime)
        fmt.Printf("Script Ending time   : %v  \n",Endtime)
    	//fmt.Println("Process finished")
    
     }
    
        func worker(id int, jobs <-chan int, results chan<- interface{},s int) {
        //	fmt.Printf("worker %d started \n", id)     // do your thread measurements here 
        thread_start := time.Now()      // current local time
        Thread_Start := thread_start.Unix()      // number of seconds since January 1, 1970 UTC
        //fmt.Printf("Thread_%d Starting time : %v \n",id,Thread_Start)
        var readproportion int
        var updateproportion int
        var opsSequence[100]int
        operation_type := s
        if operation_type == 1 {
            updateproportion = 100
            readproportion = 0
        } else if operation_type == 2 {
            updateproportion = 0
            readproportion = 100
        } else if operation_type == 3 {
                updateproportion = 50
                 readproportion = 50
        }
        count:=0
        for b := 0; b < updateproportion; b++ {
           opsSequence[b] =1
           count++
        }
        for b := 0; b < readproportion; b++ {
           opsSequence[count+b]=2
        }
        var insertCount int64 = 0
        var readCount int64 = 0
    
        	for j := range jobs {
            k := j%100;
            optype := opsSequence[k];
            //fmt.Println("operation_type : ",optype)
            var x int = int(readCount % 50000);
            switch(optype){
                case 1:
                   document := Insert_doc{Thread_id: id, KTAB: "INSERT", SyncBuffer: RandomString(10000)}
                   test := "Go_Demo_"+strconv.Itoa(id)+"_"+strconv.Itoa(int(insertCount))
                   //fmt.Println("createDocument %v",test)
                   createDocument(test, &document)
                   insertCount++;
                   break;
                case 2:
                   test := "Go_Demo_"+strconv.Itoa(id)+"_"+strconv.Itoa(x)
                   //fmt.Println("getDocument %v",test)
                   getDocument(test) 
                   readCount++;
                   break;
                default:
                      fmt.Println("Invalid Operation Type ",optype)
    
            }
        		results <- jobs
        	}
        	//fmt.Printf("worker %d completed \n", id)
        thread_end := time.Now()      // current local time 
        Thread_End := thread_end.Unix()      // number of seconds since January 1, 1970 UTC
        //fmt.Printf("Thread_%d Ending time : %v \n",id,Thread_End)
        timediff := Thread_End - Thread_Start
        var avgLatency float64 = float64(timediff)/float64(insertCount+readCount);
        var opsPerSec float64 = 1/avgLatency;
        fmt.Printf("THREAD_ID %d TOTAL WRITE : %d, TOTAL READ : %d, TOTAL OPERATION TIME : %d, AVG_LATENCY = %f S, OPS_PER_SECOND = %f \n",id, insertCount,readCount, timediff, avgLatency,opsPerSec);
        }
    
        func createDocument(documentId string, document *Insert_doc) {
            //fmt.Println("Upserting a full document...")
            _, error := bucket.Upsert(documentId, document, 0)
            if error != nil {
                fmt.Println(error.Error())
            }
        }
    func RandomString(n int) string {
    	var letter = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
    
    	b := make([]rune, n)
    	for i := range b {
    		b[i] = letter[rand.Intn(len(letter))]
    	}
    	return string(b)
    }
    func getDocument(documentId string) {
    	//fmt.Println("Getting the full document by id...")
    	var get_data Insert_doc
    	_, error := bucket.Get(documentId, &get_data)
    	if error != nil {
    		fmt.Println(error.Error())
    		return
    	}
    //	jsonPerson, _ := json.Marshal(&person)
    //	fmt.Println(string(jsonPerson))
    } 

@sanjum080816 What values are you giving for NumOfDoc and ThreadCount?

 "NumofDoc":100000,
 "ThreadCount":5,

A suggestion would be to not build a full fledged code initially. Build the code in increments and run it as you develop so that you will get to know which code fragment you added, is delaying your output. There is noway that the go code is slower than the C code if its doing the same thing.

Make a copy of your existing file and give this basic code a try (Using wait groups instead of channels):

package main

import (
"fmt"
"sync"
"time"
)

func worker(wg *sync.WaitGroup, /* Other arguments */) {
  
  defer wg.Done()

  // Your worker code
}

func main() {

 var wg sync.WaitGroup

 for i := 1; i <= 5; i++ { // Number of threads (5 in your case)
    wg.Add(1)
    go worker(&wg, /* Other arguments */)
 }

 wg.Wait()
}

Give a try without concurrence as well. Try 1 lakh records without goroutines. Compare the performance.

2 Likes

i tried this approach with few modification in making couchbase connection. 4kb per document size i have passed , inserting 5lakh records . It’s taking 200secs to complete , same in C++ got completed in 10secs

Below is the code :

package main
import (
    "fmt"
    "gopkg.in/couchbase/gocb.v1"
    "strconv"
    "time"
    "sync"
    "github.com/tkanos/gonfig"
    "math/rand"
)   
    
type Insert_doc struct {
    Thread_id int 
    KTAB, SyncBuffer string
} 

type Configuration struct {
    NumofDoc  int
    ServerIp  string
    Username   string
    Password   string
    Randstr   int
    BucketName string
    ThreadCount int
    Port int
    OP_TYPE int
}
func main() {
    configuration := Configuration{}
    _ = gonfig.GetConf("Couchbase_config.json", &configuration)
    fmt.Println("Config File name Passed :  Couchbase_config.json")
    fmt.Println("ThreadCount : ",configuration.ThreadCount)
    fmt.Println("Server IP : ",configuration.ServerIp)
    fmt.Println("Number of Requests per thread : ",configuration.NumofDoc)
    var wg sync.WaitGroup 
    for i := 0; i < configuration.ThreadCount; i++ {
        wg.Add(2)
        go worker(&wg,i,configuration.OP_TYPE)
    }

        wg.Wait()
}

func worker(wg *sync.WaitGroup,id int,s int) {
    configuration := Configuration{}
    _ = gonfig.GetConf("Couchbase_config.json", &configuration)
    var insertCount int64 = 0
    var readCount int64 = 0
    var readproportion int
    var updateproportion int
    var opsSequence[100]int
    operation_type := s
    cluster, err := gocb.Connect(configuration.ServerIp) //Connects to the cluster

    if err != nil {
        fmt.Println(err.Error())
        return
    }

    cluster.Authenticate(gocb.PasswordAuthenticator{
        Username: configuration.Username,
        Password: configuration.Password,
    })

    var bucket *gocb.Bucket
    bucket, err = cluster.OpenBucket(configuration.BucketName, "") //Connects to the bucket
    
    if err != nil {
    fmt.Println(err.Error())
    return
    }

    if operation_type == 1 {
        updateproportion = 100
        readproportion = 0
    } else if operation_type == 2 {
        updateproportion = 0
        readproportion = 100
    } else if operation_type == 3 {
        updateproportion = 50
        readproportion = 50
    }

    count:=0
    for b := 0; b < updateproportion; b++ {
        opsSequence[b] =1
        count++
    }

    for b := 0; b < readproportion; b++ {
        opsSequence[count+b]=2
    }
    
    Thread_Start := time.Now().Unix()
    for j :=0; j < configuration.NumofDoc; j++ {
       k := j%100;
       optype := opsSequence[k];
       var x int = int(readCount % 5000);
       switch(optype){
           case 1:
               document := Insert_doc{Thread_id: id, KTAB: "INSERT", SyncBuffer: RandomString(configuration.Randstr)}
               test := "Go_Demo_"+strconv.Itoa(id)+"_"+strconv.Itoa(int(insertCount))
               createDocument(bucket,test, &document)
               insertCount++;
               break;
           case 2:
               test := "Go_Demo_"+strconv.Itoa(id)+"_"+strconv.Itoa(x)
               getDocument(bucket,test) 
               readCount++;
               break;

           break;
               default:
               fmt.Println("Invalid Operation Type ",optype)
       }
    }

    Thread_End := time.Now().Unix()
    timediff := Thread_End - Thread_Start
    var avgLatency float64 = float64(timediff)/float64(insertCount+readCount);
    var opsPerSec float64 = 1/avgLatency;
    fmt.Printf("THREAD_ID %d TOTAL WRITE : %d, TOTAL READ : %d, TOTAL OPERATION TIME : %d, AVG_LATENCY = %f S, OPS_PER_SECOND = %f \n",id, insertCount,readCount, timediff, avgLatency,opsPerSec);
    wg.Done()
}

func createDocument(bucket *gocb.Bucket,documentId string, document *Insert_doc) {
    _, error := bucket.Upsert(documentId, document, 0)
    if error != nil {
    fmt.Println(error.Error())
    }
}

func RandomString(n int) string {
    var letter = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
    b := make([]rune, n)
    for i := range b {
    b[i] = letter[rand.Intn(len(letter))]
    }
    return string(b)
}

func getDocument(bucket *gocb.Bucket,documentId string) {
    var get_data Insert_doc
    _, error := bucket.Get(documentId, &get_data)
    if error != nil {
    fmt.Println(error.Error())
    return
    }

}

I did not dig too deep into the code. Just from first glance, I think this could be the issue.

In this code segment,

var wg sync.WaitGroup 
for i := 0; i < configuration.ThreadCount; i++ {
    wg.Add(2)
    go worker(&wg,i,configuration.OP_TYPE)
}

    wg.Wait()

You need to do wg.Add(1) and not 2. Everytime you call the goroutine, you need to increment it by 1. Everytime you do wg.Done() in the worker function, the count decreases by 1.
So what is happening is, in each iteration, wg (the waitgroup) is being added by 2. So total count would be 2 * threadcount. Whereas, every worker function does wg.Done() once and total now becomes the value of threadcount, instead of 0.
wg.Wait() will wait till the count is 0. Hence it is taking a much longer time.

According to my knowledge, your code should be stuck at wg.Wait() after all the workers are done. Am I right? The code did not exit?

All you have to do is change wg.Add(2) to wg.Add(1). I think that should work.

Let me know if it does.

Sorry !! My bad i have tried with wg.Add(1) itself ,pasted wrong code … When i had tried wg.Add(2) it got hung as you told .

Total count 2*thread count itself as in forloof
i have taken this in worker function … Could you please help !! How can i improvise this?

With increase in document size(generated through random function) performance degrading

One thing you could try for debugging is, give thread count as 1, and print logs everywhere in the code “with time.Now()” or with a duration since the last log. Check which part of the code is taking too much time. We can focus only on that part then.

okay i will try this

Thank you Abhay , Found out the issue … Issue was with randomstring generation everytime it was called . Thanks alot for your quick help

1 Like

Awesome! Glad to be of help :slight_smile:

1 Like

@sanjum080816 Also, do let me know once you’re completely done, what the final time taken was for go compared to C++. Would be a good information for everyone.

1 Like

Sure @abhayanoop . Currently am finished with successfull write operations , with GO taking 26secs to insert 5lakh records of 10kb each … in C++ it’s 18secs

However GetDocument is simple direct get operation am using , but get is taking 27sec , triple the time when compared to C++ and Python . Usually get will be faster, in C++ and python it has taken half of the time of insert operation .( if write is 20sec then read taking 10sec)

Any idea or suggestion on get document part ?

For starter, you should refactor the codes into actual Go Code after C++ translations. In Go, you need to be very careful with the O(n^2) for loop as you’re losing out some compute cycles to micro processes ([]runestring conversion is one of them in your other thread).

Another thing you can optimize is the error passing instead of printing out instantly. It’s better to pass the error around instead of calling fmt to printout the message directly. fmt printing uses interfaces which can slow down a lot of stuff when scaled (you can check it with string concatenation using fmt.Sprintf vs. +).

1 Like

i rectified these as per your suggestion , but still i could see Get document is not very fast it’s taking same time as insert . My concern is “Is the get document behaviour is same as insert ?” Because in C++ and Python get document takes half time then insert ( if insert 10secs then get takes 5secs )

Would you mind point to me the supporting documentations (I believe is crouchDB you’re using). By there way, where is the new codes? Can I take a look at it?

1 Like

Am using couchbase , below is my new code

package main

import (
    "fmt"
    "gopkg.in/couchbase/gocb.v1"
    "strconv"
    "time"
    "sync"
    "github.com/tkanos/gonfig"
    "math/rand"
)   
    
type Insert_doc struct {
    Thread_id int 
    KTAB, SyncBuffer string
} 

type Configuration struct {
    NumofDoc  int
    ServerIp  string
    Username   string
    Password   string
    Randstr   int
    BucketName string
    ThreadCount int
    Port int
    OP_TYPE int
}
func main() {
    configuration := Configuration{}
    _ = gonfig.GetConf("Couchbase_config.json", &configuration)
    fmt.Println("Config File name Passed :  Couchbase_config.json")
    fmt.Println("ThreadCount : ",configuration.ThreadCount)
    fmt.Println("Server IP : ",configuration.ServerIp)
    fmt.Println("Number of Requests per thread : ",configuration.NumofDoc)
    rand.Seed(time.Now().UnixNano())
    random_string := randomString(configuration.Randstr)
    var wg sync.WaitGroup 
    for i := 0; i < configuration.ThreadCount; i++ {   //ThreadCount = 5
        wg.Add(1)
        go worker(&wg,i,configuration.OP_TYPE, random_string)
    }

        wg.Wait()
}

func worker(wg *sync.WaitGroup,id int,s int, random_string string) {
    configuration := Configuration{}
    _ = gonfig.GetConf("Couchbase_config.json", &configuration)
    var insertCount int64 = 0
    var readCount int64 = 0
    var readproportion int
    var updateproportion int
    var opsSequence[100]int
    operation_type := s
    cluster, err := gocb.Connect(configuration.ServerIp) //Connects to the cluster

    if err != nil {
        fmt.Println(err.Error())
        return
    }

    cluster.Authenticate(gocb.PasswordAuthenticator{
        Username: configuration.Username,
        Password: configuration.Password,
    })

    var bucket *gocb.Bucket
    bucket, err = cluster.OpenBucket(configuration.BucketName, "") //Connects to the bucket
    
    if err != nil {
    fmt.Println(err.Error())
    return
    }
    if operation_type == 1 {
        updateproportion = 100
        readproportion = 0
    } else if operation_type == 2 {
        updateproportion = 0
        readproportion = 100
    } else if operation_type == 3 {
        updateproportion = 50
        readproportion = 50
    }

    count:=0
    for b := 0; b < updateproportion; b++ {
        opsSequence[b] =1
        count++
    }

    for b := 0; b < readproportion; b++ {
        opsSequence[count+b]=2
    }
    
    Thread_Start := time.Now().Unix()
    for j :=0; j < configuration.NumofDoc; j++ {    NumofDoc// 100000
       k := j%100;
       optype := opsSequence[k];
       var x int = int(readCount % 5000);
       switch(optype){
           case 1:
               document := Insert_doc{Thread_id: id, KTAB: "INSERT", SyncBuffer: random_string}
               test := "Go_Demo_"+strconv.Itoa(id)+"_"+strconv.Itoa(int(insertCount))
               createDocument(bucket,test, &document)
               insertCount++;
               break;
           case 2:
               test := "Go_Demo_"+strconv.Itoa(id)+"_"+strconv.Itoa(x)
               getDocument(bucket,test) 
               readCount++;
               break;

           break;
               default:
               fmt.Println("Invalid Operation Type ",optype)
       }
    }
    Thread_End := time.Now().Unix()
    timediff := Thread_End - Thread_Start
    var avgLatency float64 = float64(timediff)/float64(insertCount+readCount);
    var opsPerSec float64 = 1/avgLatency;
    fmt.Printf("THREAD_ID %d TOTAL WRITE : %d, TOTAL READ : %d, TOTAL OPERATION TIME : %d, AVG_LATENCY = %f S, OPS_PER_SECOND = %f \n",id, insertCount,readCount, timediff, avgLatency,opsPerSec);
    defer wg.Done()
}

func createDocument(bucket *gocb.Bucket,documentId string, document *Insert_doc) {
    _, error := bucket.Upsert(documentId, document, 0)
    if error != nil {
    fmt.Println(error.Error())
    }
}

// Returns an int >= min, < max
func randomInt(min, max int) int {
    return min + rand.Intn(max-min)
}

// Generate a random string of A-Z chars with len = l
func randomString(len int) string {
    bytes := make([]byte, len)
    for i := 0; i < len; i++ {
        bytes[i] = byte(randomInt(65, 90))
    }
    return string(bytes)
}
func getDocument(bucket *gocb.Bucket,documentId string) {
    var get_data Insert_doc
    _, error := bucket.Get(documentId, &get_data)
    if error != nil {
    fmt.Println(error.Error())
    return
    }
}

Please search the supporting documentations on your side while I code review your new set of codes.

1 Like

I’ve done the code reviews, can you test the source codes from here: https://play.golang.org/p/vR0x91wYyoe

and see what are the runtime errors available (I do not have any of your test bench). If everything is running, I will then add in the comments in your new source codes.

1 Like