Resolving channel conflict

I am actually trying to use an API with this kind of operation:

object := Get(key) // this is slow
object := increment(object, count) // increment count of this object
Set(key, object) // this is also slow

Issue -> So, it’s possible between Get and Set, the object has been changed by another go routine!

I am trying to do it with channel

worker(){
for {
select  {
case task <- ch:
 Here is the operation I  described above
    }
}
}

I start N such workers.

I am getting a stream of key and count from external sources.

How I can prevent the issue I described above?

Do I understand you correctly that for the duration of the three-step operation you want to “lock” key? So that while a goroutine works on a task for a key, no other goroutione may work on a task for key even if such a task arrives on ch?

You would need some kind of transaction that spans the three basic operations. Does the API provide something like this? What kind of API is it?

I think the problem described is that “key” (a mutable object) is modified between Get and Set.

What object is it? A assume some structure pointer?

There is no solution: you must structure your code so that once you pass an object in a channel, you don’t use the object on this side of the channel any more (you pass ownership.)

zero-master, can you show us the producer part of the channel (where key is created)?

Yes, it offers transaction. Here is the project: https://github.com/dgraph-io/badger
This project recommends batching updates within a single transaction to increase throughput.
Transaction fails with badger.ErrConflict if any other goroutine modifies the key in some other transaction during this time. At this moment the transaction can be retried again. If I understand it correctly, every time the transaction fails I need to perform batch SET/GET again which obviously affects throughput.

I am just incrementing the value of the key.

You can think of it as large number of key:value pair on the disk and I receive streams of keys from the network for which I need to increment the value of the key.

Here is the piece of code which highlights the issue.
Also, I am not able to figure out a way to exit the program when consumers have consumed all N tasks.

package main

import (
	"encoding/binary"
	"fmt"
	"log"
	"math/rand"
	"time"

	"github.com/dgraph-io/badger"
)

const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
const (
	letterIdxBits = 6                    // 6 bits to represent a letter index
	letterIdxMask = 1<<letterIdxBits - 1 // All 1-bits, as many as letterIdxBits
	letterIdxMax  = 63 / letterIdxBits   // # of letter indices fitting in 63 bits
)

func RandStringBytesMaskImprSrc(n int) string {
	var src = rand.NewSource(time.Now().UnixNano())
	b := make([]byte, n)
	// A src.Int63() generates 63 random bits, enough for letterIdxMax characters!
	for i, cache, remain := n-1, src.Int63(), letterIdxMax; i >= 0; {
		if remain == 0 {
			cache, remain = src.Int63(), letterIdxMax
		}
		if idx := int(cache & letterIdxMask); idx < len(letterBytes) {
			b[i] = letterBytes[idx]
			i--
		}
		cache >>= letterIdxBits
		remain--
	}

	return string(b)
}

func byteSlicetoInt64(b []byte) int64 {
	x, _ := binary.Varint(b)
	return x
}

func Int64tobyteSlice(num int64) []byte {
	buf := make([]byte, binary.MaxVarintLen64)
	n := binary.PutVarint(buf, num)
	return buf[:n]
}

func batchIncrement(db *badger.DB, keys []string, diffs []int64) {
	stop := false
	for stop == false {
		err := db.Update(func(txn *badger.Txn) error {
			for i, key := range keys {
				count := diffs[i]
				val := Int64tobyteSlice(0)
				item, err := txn.Get([]byte(key))
				if err != nil {
					if err != badger.ErrKeyNotFound {
						return err
					}
				}
				// Use initialized zero value if the key is not found
				if err != badger.ErrKeyNotFound {
					val, err = item.Value()
				}
				existingCount := byteSlicetoInt64(val)
				newCount := existingCount + count
				txn.Set([]byte(key), Int64tobyteSlice(newCount))
				log.Println("batchIncrement previous count", existingCount, "for key", key, "by", count, "New count is", newCount)
			}
			stop = true
			return nil
		})
		if err != nil {
			if err != badger.ErrConflict {
				log.Fatal(err)
			} else {
				if failOnConflict == true {
					log.Fatal(err)
				}
			}
		}
	}
}

func generateRandomKeys(keySize, batchSize int) []string {
	var keys []string
	for i := 0; i < batchSize; i++ {
		key := RandStringBytesMaskImprSrc(keySize)
		keys = append(keys, key)
	}
	return keys
}

func generateRandomDiffs(n, batchSize int) []int64 {
	var keys []int64
	for i := 0; i < batchSize; i++ {
		key := int64(rand.Intn(100))
		keys = append(keys, key)
	}
	return keys
}

const (
	batchBuffer      = 100
	batchSize        = 10
	batchDuration    = 10
	consumers        = 2
	producers        = 2
	keySize          = 20
	maxDiff          = 10
	N                = 10000
	runtimeInSeconds = 10
	failOnConflict   = true
)

func init() {
	rand.Seed(time.Now().UnixNano())
}

type batch struct {
	keys  []string
	diffs []int64
}

func producer() {
	for i := 0; i < producers; i++ {
		go func() {
			fmt.Println("Creating producer", i)
			batch := batch{
				generateRandomKeys(keySize, batchSize),
				generateRandomDiffs(maxDiff, batchSize),
			}
			for i := 0; i < N/producers; i++ {
				tasks <- batch
			}
		}()
	}
}

func consumer(db *badger.DB) {
	for i := 0; i < consumers; i++ {
		fmt.Println("Creating consumer", i)
		go func() {
			for {
				select {
				case batch := <-tasks:
					batchIncrement(db, batch.keys, batch.diffs)
				}
			}
		}()
	}
}

var tasks chan batch

func main() {
	tasks = make(chan batch, batchBuffer)
	opts := badger.DefaultOptions
	opts.Dir = "badger"
	opts.ValueDir = "badger"
	db, err := badger.Open(opts)
	if err != nil {
		log.Fatal(err)
	}
	go producer()
	go consumer(db)
	time.Sleep(time.Second * runtimeInSeconds)
}

One true master on reddit can gave me a better implementation: https://play.golang.org/p/AXEWJhSg9sT

If you turn on the flag failOnConflict, you’ll see the problem!

Discussion: https://www.reddit.com/r/golang/comments/84f5iq/how_do_i_cleanly_close_the_program_when_the/

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.