One Kafka Consumer to feed multiple gRPC clients

Hello!

I’m getting the hang of Go but I’m stumped on how to approach the following problem. I’m consuming from a kafka topic, I want to have every message that comes in from this kafka topic sent to multiple gRPC clients via a stream RPC call. The latter probably isn’t important. The problem I’m having is how to approach repeating this topic across any connected clients.

This implementation attempted to use a single channel for any connection created. However, as I just realized, Go channels are sort of like Kafka topics themselves, they don’t fan out and have every consumer of the channel get a copy of whatever was passed in.

This is my first attempt at it. What ends up happening here is that if there are not any connections, the goroutine where I’m polling the consumer stops, since its blocking when the message is received and sent to a channel. When a connection does exist, the GetEchos connection only receives the first message.

I thought about reversing the roles. Maybe have each GetEchos create it’s own channel that would be passed to another channel that my main’s consuming go routine could then iterate over and send the message. But then managing those channels in some sort of array seems daunting.

How could I best approach this?

package main

import (
	"fmt"
	"net"
	"os"
	"os/signal"
	"syscall"

	"github.com/confluentinc/confluent-kafka-go/kafka"
	pb "github.com/parkhub/scout-echo/echo"
	"google.golang.org/grpc"
)

// This server takes a channel of messages it will use to stream to it's clients
type echoServer struct {
	scoutMessages chan pb.Message
}

func (server *echoServer) GetEchos(_ *pb.Empty, stream pb.Echo_GetEchosServer) error {
	for {
		select {
		case message := <-server.scoutMessages:
			fmt.Printf("Got message!")
			if err := stream.Send(&message); err != nil {
				return err
			}

			// return nil
		default:
			continue
		}
	}
}

func getEnv(key, fallback string) string {
	// If key does not exist in environment
	if value, ok := os.LookupEnv(key); ok {
		return value
	}

	return fallback
}

func main() {
	broker := getEnv("BROKER_ADDR", "kafka:9092")
	connPort := getEnv("GRPC_PORT", "50051")
	groupID := getEnv("GROUP_ID", "SCOUT_ECHO_SERVICE")
	topic := getEnv("TCP_LIVE_TOPIC", "ScoutTCPLiveStream")

	scoutMessageChan := make(chan pb.Message)
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

	consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers":  broker,
		"group.id":           groupID,
		"session.timeout.ms": 6000,
		"default.topic.config": kafka.ConfigMap{
			"auto.offset.reset": "earliest",
		},
	})

	if err != nil {
		fmt.Printf("Failed to create consumer: %s\n", err)
		os.Exit(1)
	}

	fmt.Printf("Created Consumer %v\n", consumer)

	defer consumer.Close()

	err = consumer.Subscribe(topic, nil)

	if err != nil {
		fmt.Printf("Unable to subscribe to topic: %v\n with error: %s\n", &topic, err)
		os.Exit(1)
	}

	server := &echoServer{scoutMessages: scoutMessageChan}

	go func() {
	readScoutMessages:
		for {
			select {
			case signal := <-sigChan:
				fmt.Printf("Caught signal %v: terminating\n", signal)
				break
			default:
				// Let's get events
				event := consumer.Poll(100)
				// If we don't have any type of event then restart the loop
				if event == nil {
					continue
				}

				switch e := event.(type) {
				case *kafka.Message:
					// Sending the message to the scout message channel
					fmt.Printf("Message on %s\n", e.TopicPartition)

					scoutMessageChan <- pb.Message{OriginalConnAdd: e.Key, Packet: e.Value}
				case kafka.PartitionEOF:
					fmt.Printf("Reached %v\n", e)
				case kafka.Error:
					fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
					break readScoutMessages
				default:
					fmt.Printf("Ignored %v\n", e)
				}
			}
		}
	}()

	// Listen for incoming connections
	lis, err := net.Listen("tcp", ":"+connPort)

	if err != nil {
		fmt.Println("Error listening:", err.Error())
		os.Exit(1)
	}

	// Close the listener when this application closes
	defer lis.Close()

	fmt.Println("Listening on Port:" + connPort)

	grpcServer := grpc.NewServer()
	pb.RegisterEchoServer(grpcServer, server)

	if err := grpcServer.Serve(lis); err != nil {
		fmt.Fprintf(os.Stderr, "%% Error: %v\n", err)
	}
}

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.