Hello!
I’ve done some research on this topic and can’t really find an answer. Coming from the Nodejs world, where TCP connections “sends” me data on an event listener, I never had to worry about creating the byte array for a TCP client’s stream.
I’m writing an “ingestor” to take TCP connections and their data and put it in a kafka topic. I want the data stream as it comes(non-empty messages) and don’t care too much about parsing the messages right now. They are designing the modems that connect to my server to keep the connection open and send messages from magnetometer data. In other words, I don’t know the size of the data packet. It is my belief that in my code, the infinite for loop in the handleRequest function is filling up the buf variable every time I make a read, which means that it could potentially fill up over time.
However, this also works because on each read I can produce a Kafka message but I could run into an issue where the buf gets filled. I also looked at using ioutil.ReadAll which would read until the client closes or another error, but I can’t act on it until that happens.
My hunch is that I need to use ReadAll and create a channel which contains portions of the ReadAll and feed it to some other code. But it feels…hacky. I also think that keeping TCP connections alive without a framework to tell me what to expect from each read is not the best way to do it, but TCP stuff is new to alot of us on my team. For example, should a TCP client end a connection after delivering it’s data? Anyways, thats besides the point.
Any suggestions as to how to approach this?
package main
import (
"encoding/binary"
"fmt"
"net"
"os"
)
const (
CONN_HOST = "localhost"
CONN_PORT = "9000"
CONN_TYPE = "tcp"
CONN_URL = CONN_HOST + ":" + CONN_PORT
)
func main() {
// Listen for incoming connections
l, err := net.Listen(CONN_TYPE, CONN_URL)
if err != nil {
fmt.Println("Error listening:", err.Error())
os.Exit(1)
}
// Close the listener when this application closes
defer l.Close()
fmt.Println("Listening on " + CONN_URL)
for {
// Listen for connections
conn, err := l.Accept()
if err != nil {
fmt.Println("Error accepting connection:", err.Error())
os.Exit(1)
}
go handleRequest(conn)
}
}
func handleRequest(conn net.Conn) {
// Buffer that holds incoming information
buf := make([]byte, 1024)
for {
len, err := conn.Read(buf)
if err != nil {
fmt.Println("Error reading:", err.Error())
break
}
s := string(buf[:len])
fmt.Println("Stuff", s)
fmt.Println("len", binary.Size(buf))
}
}