Corrupted buffer on high-load

Hello, I’m writing simple server to connect to dlms meters and get readings from them, but faceing with corrupted buffer problem on high-load, but everything fine on few connections. It’s all what i’m doing.
(Don’t pay attention to the hard code, the idea was to test)

package internal

import (
	"encoding/hex"
	"fmt"
	"net"
	"regexp"
	"strconv"
	"strings"
	"sync"
	"time"
)

type Meter struct {
	socket      *net.TCPConn
	state       State
	meterSerial string
	mutex       sync.Mutex
}


func NewMeter(socket *net.TCPConn) *Meter {
	return &Meter{
		socket: socket,
		state:  Connected,
	}
}

func (m *Meter) Read() {
	for {
		data := make([]byte, 1024)
		m.mutex.Lock()
		n, err := m.socket.Read(data)
		m.mutex.Unlock()
		if err == net.ErrClosed {
			m.socket.Close()
			return
		}
		if n > 0 {
			hexString := hex.EncodeToString(data[:n])
			fmt.Printf("Read %d bytes\nData: %s\n", n, hexString)
			m.OnData(hexString)
		}
	}
}

func (m *Meter) OnData(hexString string) {
	if hexString == Handshake {
		if m.state == Connected {
			response, err := hex.DecodeString(AARE)
			if err != nil {
				fmt.Println(err)
			}
			go m.socket.Write(response)
			m.state = ReadAARQ
			return
		} else {
			response, err := hex.DecodeString(Handshake)
			if err != nil {
				fmt.Println(err)
			}
			go m.socket.Write(response)
			return
		}
	}

	if m.state == ReadAARQ {
		if hexString == AARQ {
			response, err := hex.DecodeString(GetSerialRequest)
			if err != nil {
				fmt.Println(err)
			}
			go m.socket.Write(response)
			m.state = ReadSerial
			return
		}
	}

	if m.state == ReadSerial {
		m.meterSerial = parseSerial(hexString)
		from := time.Date(2023, 1, 1, 0, 0, 0, 0, time.Local)
		m.getDailyBilling(from, time.Now())
		return
	}

	if m.state == ReadDailyBilling {
		parseDailyBilling(hexString)

		if hexString[18:20] == "02" && hexString[22:24] != "01" {
			response, _ := hex.DecodeString(fmt.Sprintf("0001001100010007c002c1%s", hexString[24:32]))
			go m.socket.Write(response)
			return
		}

		m.state = Idle
	}
}

func parseSerial(hexData string) string {
	if !strings.HasPrefix(hexData, "0001000100110012c401c100090c") {
		return ""
	}

	bytes, err := hex.DecodeString(hexData[28:])
	if err != nil {
		fmt.Println(err)
	}

	return string(bytes)
}

func (m *Meter) getDailyBilling(dates ...time.Time) {
	var from, to time.Time
	if len(dates) > 2 || len(dates) == 0 {
		return
	}
	if len(dates) == 1 {
		from = dates[0]
		to = dates[0].AddDate(0, 0, 1)
	}
	if len(dates) == 2 {
		from = dates[0]
		to = dates[1]
	}

	response, err := hex.DecodeString(fmt.Sprintf("0001001100010040c001c100070000620200ff0201010204020412000809060000010000ff0f02120000090c%s090c%s0100", dateToHex(from), dateToHex(to)))
	if err != nil {
		fmt.Println(err)
	}
	go m.socket.Write(response)
	m.state = ReadDailyBilling
}

func parseDailyBilling(hexData string) []map[string]interface{} {
	data := strings.Split(hexData, "0216090c")
	if len(data) == 0 {
		return nil
	}

	result := make([]map[string]interface{}, 0)

	for _, dailyData := range data[1:] {
		valid := true
		if !strings.HasPrefix(dailyData, "07") {
			valid = false
		}
		date := dlmsHexToDate(dailyData[0:24])
		vals := regexp.MustCompile(".{1,10}").FindAllString(dailyData[24:234], -1)

		reading := make([]int, 0)

		for _, val := range vals {
			if !strings.HasPrefix(val, "06") {
				valid = false
			}
			decVal, _ := hex.DecodeString(val[2:])
			reading = append(reading, int(decVal[0]))
		}

		if !valid {
			fmt.Println("whole buff:", data)
			fmt.Println("date:", dailyData[0:24])
			fmt.Println("vals:", vals)
			panic("Bleeeee")
		}

		if valid {
			result = append(result, map[string]interface{}{
				"date":    date,
				"reading": reading,
			})
		}
	}
	return result
}

func dateToHex(date time.Time) string {
	return fmt.Sprintf("0%02x%02x%02x%02x%02x%02x%02xFFFED400", date.Year(), int(date.Month()), date.Day(), int(date.Weekday()), date.Hour(), date.Minute(), date.Second())
}

func dlmsHexToDate(hexString string) time.Time {
	hexyear, _ := hex.DecodeString(hexString[0:4])
	hexmonth, _ := hex.DecodeString(hexString[4:6])
	hexday, _ := hex.DecodeString(hexString[6:8])
	hexhour, _ := hex.DecodeString(hexString[10:12])
	hexminute, _ := hex.DecodeString(hexString[12:14])
	hexsecond, _ := hex.DecodeString(hexString[14:16])

	year, _ := strconv.ParseInt(string(hexyear), 0, 16)
	month, _ := strconv.ParseInt(string(hexmonth), 0, 16)
	day, _ := strconv.ParseInt(string(hexday), 0, 16)
	hour, _ := strconv.ParseInt(string(hexhour), 0, 16)
	minute, _ := strconv.ParseInt(string(hexminute), 0, 16)
	second, _ := strconv.ParseInt(string(hexsecond), 0, 16)

	return time.Date(int(year), time.Month(month), int(day), int(hour), int(minute), int(second), 0, time.UTC)
}

Need more detail on corrupted buffer problem, what did you expected to see, and did saw?

Just briefly read you code, here are some suggestions (commented in code blocks). BTW, GitHub is more likely a better platform for Code Review.

func (m *Meter) OnData(hexString string) {
	if hexString == Handshake {
		if m.state == Connected {
			response, err := hex.DecodeString(AARE)
			if err != nil {
				fmt.Println(err) // should return if error occured
			}
			go m.socket.Write(response) // maybe use channel is a better choice, such that you won't have to lock the socket connection
			m.state = ReadAARQ
			return
		} else {
			response, err := hex.DecodeString(Handshake)
			if err != nil {
				fmt.Println(err)
			}
			go m.socket.Write(response)
			return
		}
	}

	if m.state == ReadAARQ {
		if hexString == AARQ {
			response, err := hex.DecodeString(GetSerialRequest)
			if err != nil {
				fmt.Println(err)
			}
			go m.socket.Write(response)
			m.state = ReadSerial
			return
		}
	}

	if m.state == ReadSerial {
		m.meterSerial = parseSerial(hexString)
		from := time.Date(2023, 1, 1, 0, 0, 0, 0, time.Local)
		m.getDailyBilling(from, time.Now())
		return
	}

	if m.state == ReadDailyBilling {
		parseDailyBilling(hexString) // return value is not used

		if hexString[18:20] == "02" && hexString[22:24] != "01" {
			response, _ := hex.DecodeString(fmt.Sprintf("0001001100010007c002c1%s", hexString[24:32]))
			go m.socket.Write(response)
			return
		}

		m.state = Idle
	}
}

Thaks for the answer, meters use ber encoding and i faced with 2 types of buffer corruption:

  1. Buffer size less than expected.
    For ex. 00 01 00 01 00 11 00 04 c0 01 c4 01
    where 00 04 is uint16 which is PDU size, sometimes i get PDU less than 4 bytes
  2. Invalid bytes.
    For ex. instead of 00 01 00 01 00 11 00 04 c0 01 c4 01 get 00 01 00 01 00 11 00 04 c0 01 c4 21 something like this.

Do you mind to book a meeting session, let’s discuss about your code online?