Do I understand you correctly that for the duration of the three-step operation you want to “lock” key? So that while a goroutine works on a task for a key, no other goroutione may work on a task for key even if such a task arrives on ch?
You would need some kind of transaction that spans the three basic operations. Does the API provide something like this? What kind of API is it?
I think the problem described is that “key” (a mutable object) is modified between Get and Set.
What object is it? A assume some structure pointer?
There is no solution: you must structure your code so that once you pass an object in a channel, you don’t use the object on this side of the channel any more (you pass ownership.)
zero-master, can you show us the producer part of the channel (where key is created)?
Yes, it offers transaction. Here is the project: GitHub - dgraph-io/badger: Fast key-value DB in Go.
This project recommends batching updates within a single transaction to increase throughput.
Transaction fails with badger.ErrConflict if any other goroutine modifies the key in some other transaction during this time. At this moment the transaction can be retried again. If I understand it correctly, every time the transaction fails I need to perform batch SET/GET again which obviously affects throughput.
I am just incrementing the value of the key.
You can think of it as large number of key:value pair on the disk and I receive streams of keys from the network for which I need to increment the value of the key.
Here is the piece of code which highlights the issue.
Also, I am not able to figure out a way to exit the program when consumers have consumed all N tasks.
package main
import (
"encoding/binary"
"fmt"
"log"
"math/rand"
"time"
"github.com/dgraph-io/badger"
)
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
const (
letterIdxBits = 6 // 6 bits to represent a letter index
letterIdxMask = 1<<letterIdxBits - 1 // All 1-bits, as many as letterIdxBits
letterIdxMax = 63 / letterIdxBits // # of letter indices fitting in 63 bits
)
func RandStringBytesMaskImprSrc(n int) string {
var src = rand.NewSource(time.Now().UnixNano())
b := make([]byte, n)
// A src.Int63() generates 63 random bits, enough for letterIdxMax characters!
for i, cache, remain := n-1, src.Int63(), letterIdxMax; i >= 0; {
if remain == 0 {
cache, remain = src.Int63(), letterIdxMax
}
if idx := int(cache & letterIdxMask); idx < len(letterBytes) {
b[i] = letterBytes[idx]
i--
}
cache >>= letterIdxBits
remain--
}
return string(b)
}
func byteSlicetoInt64(b []byte) int64 {
x, _ := binary.Varint(b)
return x
}
func Int64tobyteSlice(num int64) []byte {
buf := make([]byte, binary.MaxVarintLen64)
n := binary.PutVarint(buf, num)
return buf[:n]
}
func batchIncrement(db *badger.DB, keys []string, diffs []int64) {
stop := false
for stop == false {
err := db.Update(func(txn *badger.Txn) error {
for i, key := range keys {
count := diffs[i]
val := Int64tobyteSlice(0)
item, err := txn.Get([]byte(key))
if err != nil {
if err != badger.ErrKeyNotFound {
return err
}
}
// Use initialized zero value if the key is not found
if err != badger.ErrKeyNotFound {
val, err = item.Value()
}
existingCount := byteSlicetoInt64(val)
newCount := existingCount + count
txn.Set([]byte(key), Int64tobyteSlice(newCount))
log.Println("batchIncrement previous count", existingCount, "for key", key, "by", count, "New count is", newCount)
}
stop = true
return nil
})
if err != nil {
if err != badger.ErrConflict {
log.Fatal(err)
} else {
if failOnConflict == true {
log.Fatal(err)
}
}
}
}
}
func generateRandomKeys(keySize, batchSize int) []string {
var keys []string
for i := 0; i < batchSize; i++ {
key := RandStringBytesMaskImprSrc(keySize)
keys = append(keys, key)
}
return keys
}
func generateRandomDiffs(n, batchSize int) []int64 {
var keys []int64
for i := 0; i < batchSize; i++ {
key := int64(rand.Intn(100))
keys = append(keys, key)
}
return keys
}
const (
batchBuffer = 100
batchSize = 10
batchDuration = 10
consumers = 2
producers = 2
keySize = 20
maxDiff = 10
N = 10000
runtimeInSeconds = 10
failOnConflict = true
)
func init() {
rand.Seed(time.Now().UnixNano())
}
type batch struct {
keys []string
diffs []int64
}
func producer() {
for i := 0; i < producers; i++ {
go func() {
fmt.Println("Creating producer", i)
batch := batch{
generateRandomKeys(keySize, batchSize),
generateRandomDiffs(maxDiff, batchSize),
}
for i := 0; i < N/producers; i++ {
tasks <- batch
}
}()
}
}
func consumer(db *badger.DB) {
for i := 0; i < consumers; i++ {
fmt.Println("Creating consumer", i)
go func() {
for {
select {
case batch := <-tasks:
batchIncrement(db, batch.keys, batch.diffs)
}
}
}()
}
}
var tasks chan batch
func main() {
tasks = make(chan batch, batchBuffer)
opts := badger.DefaultOptions
opts.Dir = "badger"
opts.ValueDir = "badger"
db, err := badger.Open(opts)
if err != nil {
log.Fatal(err)
}
go producer()
go consumer(db)
time.Sleep(time.Second * runtimeInSeconds)
}