I have a 2 project where the 1st project works as a Producer for Rabbit MQ and Sends data to a Queue. 2nd project works as a Consumer need to always listen to if the queue got any news data if it does then submit those data to a database. How can I create this Listener for my 2nd Project in Golang?
So far I tried…
- RabbitMQ Consumer part
- Build a scheduler using time.Ticker
Result:
Stop the server then again works correctly but as this is a web based microservice it’s not possible to stop the server.
package main
import (
"encoding/json"
"fmt"
"github.com/streadway/amqp"
"log"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
ticker := time.NewTicker(time.Second * 5)
go scheduler(ticker)
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
ticker.Stop()
//os.Exit(1)
}
func scheduler(ticker *time.Ticker) {
for ;true;<- ticker.C {
Consumers()
}
}
func HandleError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
type RabbitLocationData struct {
Domain string `json:"domain"`
UserId string `json:"user_id"`
ClientTimeStampUTC float64 `json:"client_timestamp_utc"`
ServerTimeStampUTC float64 `json:"server_timestamp_utc"`
Longitude float64 `json:"lon"`
Latitude float64 `json:"lat"`
}
func (rld RabbitLocationData) ToString() string {
result := fmt.Sprintf("RabbitLocationData{userId='%s', "+
"clientTimestampUtc=%f, "+
"serverTimestampUtc=%f, "+
"latitude=%f, "+
"longitude=%f, "+
"domain='%s'"+
"}",
rld.UserId,
rld.ClientTimeStampUTC,
rld.ServerTimeStampUTC,
rld.Latitude,
rld.Longitude,
rld.Domain)
return result
}
func Consumers() {
// Make a connection
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
HandleError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// Create a channel
ch, err := conn.Channel()
HandleError(err, "Failed to open a channel")
defer ch.Close()
// Declare a queue
q, err := ch.QueueDeclare(
"hello",
false,
false,
false,
false,
nil,
)
HandleError(err, "could not declare 'hello' queue")
err = ch.Qos(1, 0, false)
HandleError(err, "could not configure Qos")
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
HandleError(err, "could not register consumer")
stopChannel := make(chan bool)
go func() {
log.Printf("Consumer ready, PID: %d", os.Getpid())
for d := range msgs {
log.Printf("Received a message: %s", string(d.Body))
locationData := &RabbitLocationData{}
err := json.Unmarshal(d.Body, locationData)
if err != nil {
log.Printf("Error decoding JSON: %s", err)
}
log.Printf("Result: %s", locationData.ToString())
if err := d.Ack(false); err != nil {
log.Printf("Error acknowledging message: %s", err)
} else {
log.Printf("Acknowledged message")
}
}
}()
log.Printf("[*] waiting for messages. To exit press CTRL+C")
<-stopChannel
}