Cron job for RabbitMQ Consumer

I have a 2 project where the 1st project works as a Producer for Rabbit MQ and Sends data to a Queue. 2nd project works as a Consumer need to always listen to if the queue got any news data if it does then submit those data to a database. How can I create this Listener for my 2nd Project in Golang?

So far I tried…

  1. RabbitMQ Consumer part
  2. Build a scheduler using time.Ticker

Result:
Stop the server then again works correctly but as this is a web based microservice it’s not possible to stop the server.

package main

import (
	"encoding/json"
	"fmt"
	"github.com/streadway/amqp"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"
)

func main() {
	ticker := time.NewTicker(time.Second * 5)
	go scheduler(ticker)
	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
	<-sigs
	ticker.Stop()
	//os.Exit(1)
}

func scheduler(ticker *time.Ticker) {
	for ;true;<- ticker.C {
		Consumers()
	}
}

func HandleError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

type RabbitLocationData struct {
	Domain             string  `json:"domain"`
	UserId             string  `json:"user_id"`
	ClientTimeStampUTC float64  `json:"client_timestamp_utc"`
	ServerTimeStampUTC float64  `json:"server_timestamp_utc"`
	Longitude          float64 `json:"lon"`
	Latitude           float64 `json:"lat"`
}

func (rld RabbitLocationData) ToString() string {
	result := fmt.Sprintf("RabbitLocationData{userId='%s', "+
		"clientTimestampUtc=%f, "+
		"serverTimestampUtc=%f, "+
		"latitude=%f, "+
		"longitude=%f, "+
		"domain='%s'"+
		"}",
		rld.UserId,
		rld.ClientTimeStampUTC,
		rld.ServerTimeStampUTC,
		rld.Latitude,
		rld.Longitude,
		rld.Domain)
	return result
}


func Consumers() {
	// Make a connection
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	HandleError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	// Create a channel
	ch, err := conn.Channel()
	HandleError(err, "Failed to open a channel")
	defer ch.Close()

	// Declare a queue
	q, err := ch.QueueDeclare(
		"hello",
		false,
		false,
		false,
		false,
		nil,
	)
	HandleError(err, "could not declare 'hello' queue")

	err = ch.Qos(1, 0, false)

	HandleError(err, "could not configure Qos")

	msgs, err := ch.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
	)

	HandleError(err, "could not register consumer")
	stopChannel := make(chan bool)

	go func() {
		log.Printf("Consumer ready, PID: %d", os.Getpid())
		for d := range msgs {
			log.Printf("Received a message: %s", string(d.Body))
			locationData := &RabbitLocationData{}

			err := json.Unmarshal(d.Body, locationData)

			if err != nil {
				log.Printf("Error decoding JSON: %s", err)
			}

			log.Printf("Result: %s", locationData.ToString())

			if err := d.Ack(false); err != nil {
				log.Printf("Error acknowledging message: %s", err)
			} else {
				log.Printf("Acknowledged message")
			}
		}
	}()

	log.Printf("[*] waiting for messages. To exit press CTRL+C")
	<-stopChannel
}

Why do you run Consumers every five seconds when the AMQP consumer started in this function is pushed messages by RabbitMQ forever?

Just open the AMQP connection and the channel once, start the consumer, consume the messages pushed to it by the AMPQ message in a loop and handle each received message.

I call Consumer every five second because
Suppose I push some messages to rabbitmq’s queue then
from another project run, it consume those queued messages for 1st time when the project run but after consume acknowledge those messages and wait for new one but in the meanwhile the queue updated so 2nd project can check if their any new messages to consume.

That’s not how RabbitMQ works. If you have two consumers consuming from the same queue, they are pushed messages basically using a round robin algorithm. It is possible to configure a consumer so that it is only pushed a maximum number of messages to consume and acknowledge (search for “consumer prefetch”), but basically RabbitMQ evenly pushes messages to all connected consumers.

Just write you application so that it opens a connection and a channel once, creates a consumer and lets it consumer forever. If you start this application twice, RabbitMQ will distribute and push the messages evenly to both consumers.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.