I came across this post (website channel stops when using QoS on channel if the number of message is more than QoS defined number of message) while troubleshooting an issue I’m facing with RabbitMQ in my Go application. The problem is, when I use QoS on a channel and the number of messages exceeds the defined limit, the channel stops receiving messages.
Here is the code I’m using:
package rmqcode
import (
"context"
"fmt"
"log"
"sync"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func recWorker(worknum int, del *amqp.Delivery) {
println(fmt.Sprintf("start %d", worknum))
time.Sleep(time.Second * 2)
println(fmt.Sprintf("end %d", worknum))
del.Ack(true)
}
func Receiver() {
conn, err := amqp.Dial("amqp://user:pass@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.Qos(
3, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
msg, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
var wg sync.WaitGroup
wg.Add(1)
go func() {
var counter = 1
for msgs := range msg {
go recWorker(counter, &msgs)
counter = counter + 1
}
fmt.Println("stopped")
wg.Done()
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
wg.Wait()
}
func Send() {
conn, err := amqp.Dial("amqp://user:pass@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
body := "Hello World!"
err = ch.PublishWithContext(ctx,
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s\n", body)
}
In my case, I’ve set the QoS to a specific number, but when the number of messages exceeds this number, the consumer doesn’t process further messages. I’ve tried running the code as shown in the post, but I still face the same problem. I suspect it’s related to how the messages are being acknowledged or possibly the prefetch count, but I’m not entirely sure.
Has anyone else encountered this issue when working with RabbitMQ and QoS? Any suggestions on how to resolve this or properly handle message queues without the channel freezing?