TCP server to read an unknown number of bytes but act on it as they come

Hello!

I’ve done some research on this topic and can’t really find an answer. Coming from the Nodejs world, where TCP connections “sends” me data on an event listener, I never had to worry about creating the byte array for a TCP client’s stream.

I’m writing an “ingestor” to take TCP connections and their data and put it in a kafka topic. I want the data stream as it comes(non-empty messages) and don’t care too much about parsing the messages right now. They are designing the modems that connect to my server to keep the connection open and send messages from magnetometer data. In other words, I don’t know the size of the data packet. It is my belief that in my code, the infinite for loop in the handleRequest function is filling up the buf variable every time I make a read, which means that it could potentially fill up over time.

However, this also works because on each read I can produce a Kafka message but I could run into an issue where the buf gets filled. I also looked at using ioutil.ReadAll which would read until the client closes or another error, but I can’t act on it until that happens.

My hunch is that I need to use ReadAll and create a channel which contains portions of the ReadAll and feed it to some other code. But it feels…hacky. I also think that keeping TCP connections alive without a framework to tell me what to expect from each read is not the best way to do it, but TCP stuff is new to alot of us on my team. For example, should a TCP client end a connection after delivering it’s data? Anyways, thats besides the point.

Any suggestions as to how to approach this?

package main

import (
	"encoding/binary"
	"fmt"
	"net"
	"os"
)

const (
	CONN_HOST = "localhost"
	CONN_PORT = "9000"
	CONN_TYPE = "tcp"
	CONN_URL  = CONN_HOST + ":" + CONN_PORT
)

func main() {
	// Listen for incoming connections
	l, err := net.Listen(CONN_TYPE, CONN_URL)

	if err != nil {
		fmt.Println("Error listening:", err.Error())
		os.Exit(1)
	}

	// Close the listener when this application closes
	defer l.Close()

	fmt.Println("Listening on " + CONN_URL)
	for {
		// Listen for connections
		conn, err := l.Accept()

		if err != nil {
			fmt.Println("Error accepting connection:", err.Error())
			os.Exit(1)
		}

		go handleRequest(conn)
	}
}

func handleRequest(conn net.Conn) {
	// Buffer that holds incoming information
	buf := make([]byte, 1024)

	for {
		len, err := conn.Read(buf)

		if err != nil {
			fmt.Println("Error reading:", err.Error())
			break
		}

		s := string(buf[:len])

		fmt.Println("Stuff", s)
		fmt.Println("len", binary.Size(buf))
	}
}

Your code looks fine. It reads up to 1024 bytes per iteration into the buffer, and prints the received bytes. Next iteration the same buffer is reused. Without looking at the structure of the received data you can’t know if the 789 bytes or whatever you got is one message, half a message, or five messages. But if you’re just passing it along you probably don’t need to care.

You’re right. After playing around with it some, I realized that the buf doesn’t get filled up in new space. It just replaces content UP to the size PER read.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.