How to convert string into protoreflect.ProtoMessage or into anypb

I’m trying to use server sent event eventsoource through gRPC gateway, so read this
and this and came out with the below proto

syntax = "proto3";

package protobuf;
option go_package = "/proto";

import "google/protobuf/any.proto";
import "google/protobuf/empty.proto";
import "google/api/annotations.proto";

service Events {
  rpc StreamEvents (google.protobuf.Empty) returns (stream EventSource) {
    option (google.api.http) = {
      get: "/v1/rawdata/stream"
    };
  }
}

message Empty{}

message EventSource {
  string event = 1;
  google.protobuf.Any data = 2;
}

And generated all required gRPC files (Message/Service/Gateway) using:

protoc -I ./proto \
  --go_out ./proto --go_opt paths=source_relative \
  --go-grpc_out ./proto --go-grpc_opt paths=source_relative \
  --grpc-gateway_out ./proto --grpc-gateway_opt paths=source_relative \
  ./proto/data.proto

And trying to registered the generated service and gateway at my main app, as:

package main

import (
	"context"
	"fmt"
	"log"
	"net"
	"net/http"
	"os"
	"os/signal"
	"sync"
	"syscall"

	"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
	_ "github.com/mattn/go-sqlite3"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"

	pb "walistner/proto"
)

func main() {
	gRPcPort := ":50005"
	// Create a gRPC listener on TCP port
	lis, err := net.Listen("tcp", gRPcPort)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	// Create a gRPC server object
	s := grpc.NewServer()

	// Attach the Greeter service to the server
	pb.RegisterEventsServer(s, server{})

	log.Println("Serving gRPC server on 0.0.0.0:50005")

	// Serve gRPC server
	grpcTerminated := make(chan struct{})
	go func() {
		if err := s.Serve(lis); err != nil {
			log.Fatalf("failed to serve: %v", err)
			close(grpcTerminated) // In case server is terminated without us requesting this
		}
	}()

	// Create a client connection to the gRPC server we just started
	// This is where the gRPC-Gateway proxies the requests
	//	gateWayTarget := fmt.Sprintf("0.0.0.0%s", gRPcPort)
	conn, err := grpc.DialContext(
		context.Background(),
		gRPcPort,
		grpc.WithBlock(),
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
	if err != nil {
		log.Fatalln("Failed to dial server:", err)
	}

	gwmux := runtime.NewServeMux()

    // This handle to use JSON ServerSentEvent, not the gRPC one
	//http.HandleFunc("/sse", passer.HandleSignal)
	// Register custom route for  GET /hello/{name}
	//err = gwmux.HandlePath("GET", "/sse", passer.HandleSignal)
	//if err != nil {
	//	fmt.Println("Error:", err)
	//}

    // This handler just to confirm server is up and running
	// Register custom route for  GET /hello/{name}
	err = gwmux.HandlePath("GET", "/hello/{name}", func(w http.ResponseWriter, r *http.Request, pathParams map[string]string) {
		w.Write([]byte("hello " + pathParams["name"]))
	})
	if err != nil {
		fmt.Println("Error:", err)
	}

	// Register the gRPC ServerSentEvent
	err = pb.RegisterEventsHandler(context.Background(), gwmux, conn)
	if err != nil {
		log.Fatalln("Failed to register gateway:", err)
	}

	gwServer := &http.Server{
		Addr:    ":8090",
		Handler: allowCORS(gwmux),
	}

	log.Println("Serving gRPC-Gateway on http://localhost:8090")

	fmt.Println("run POST request of: http://localhost:8090/v1/rawdata/stream")
	fmt.Println("or run curl -X GET -k http://localhost:8090/v1/rawdata/stream")
	log.Fatal(gwServer.ListenAndServe()) // <- This line alone could be enough ang no need for all the lines after,

	// the application is probably doing other things and you will want to be
	// able to shutdown cleanly; passing in a context is a good method..
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel() // Ensure cancel function is called eventually

	grpcWebTerminated := make(chan struct{})
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		if err := gwServer.ListenAndServe(); err != nil {
			fmt.Printf("Web server (GRPC) shutdown: %s", err)
		}
		close(grpcWebTerminated) // In case server is terminated without us requesting this
	}()

	// Wait for the web server to shutdown OR the context to be cancelled...
	select {
	case <-ctx.Done():
		// Shutdown the servers (there are shutdown commands to request this)
	case <-grpcTerminated:
		// You may want to exit if this happens (will be due to unexpected error)
	case <-grpcWebTerminated:
		// You may want to exit if this happens (will be due to unexpected error)
	}
	// Wait for the goRoutines to complete
	<-grpcTerminated
	<-grpcWebTerminated

	// Listen to Ctrl+C (you can also do something else that prevents the program from exiting)
	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt, syscall.SIGTERM)

	<-c
	if client.IsConnected() {
		passer.data <- sseData{
			event:   "notification",
			message: "Server is shut down at the host machine...",
		}
		client.Disconnect()
	}
}

// allowCORS allows Cross Origin Resoruce Sharing from any origin.
// Don't do this without consideration in production systems.
func allowCORS(h http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		if origin := r.Header.Get("Origin"); origin != "" {
			w.Header().Set("Access-Control-Allow-Origin", origin)
		}
		h.ServeHTTP(w, r)
	})
}

And the main part that is having an issue, is ther gateway registration:

package main

import (
	"fmt"
	"log"
	"sync"
	pb "walistner/proto"

	"google.golang.org/grpc/metadata"
	"google.golang.org/grpc/peer"
	"google.golang.org/protobuf/reflect/protoreflect"
	"google.golang.org/protobuf/types/known/anypb"
	"google.golang.org/protobuf/types/known/emptypb"
)

type server struct {
	pb.UnimplementedEventsServer
}

func (s server) StreamEvents(_ *emptypb.Empty, srv pb.Events_StreamEventsServer) error {
	if p, ok := peer.FromContext(srv.Context()); ok {
		fmt.Println("Client ip is:", p.Addr.String())
	}
	md := metadata.New(map[string]string{"Content-Type": "text/event-stream", "Connection": "keep-alive"})

	srv.SetHeader(md)

	//use wait group to allow process to be concurrent
	var wg sync.WaitGroup

	for {
		incomingInput := <-passer.data
		wg.Add(1)
		//go func(count int64) {
		go func() {
			defer wg.Done()
      
            myData := incomingInput.data <---------------This is a string

            // Convert data to *anypb.Any
			data, err := anypb.New(myData.(protoreflect.ProtoMessage)) <----- tried this but it failed
			if err != nil {
				//...
			}

			resp := pb.EventSource{
				Event: incomingInput.event,
				Data:  data, // <---------- this is required to be of type *anypb.Any
			}

			if err := srv.Send(&resp); err != nil {
				log.Printf("send error %v", err)
			}
		}()
	}

	wg.Wait()
	return nil
}

My data structure inside the go app is:

type sseData struct {
	event, message string
}
type DataPasser struct {
	data       chan sseData
	logs       chan string
	connection chan struct{} // To control maximum allowed clients connections
}

var passer *DataPasser

func init() {
	passer = &DataPasser{
		data:       make(chan sseData),
		logs:       make(chan string),
		connection: make(chan struct{}, maxClients),
	}
}

anypb#New takes a proto.Message, which is an alias to protoreflect.ProtoMessage, which is an interface that looks like this:

type ProtoMessage interface{
    ProtoReflect() Message
}

So to conform to that interface your type will need to implement that interface and return a protoreflect.Message