RabbitMQ Channel Freezing When Message Count Exceeds QoS Limit: How to Resolve?

I came across this post (website channel stops when using QoS on channel if the number of message is more than QoS defined number of message) while troubleshooting an issue I’m facing with RabbitMQ in my Go application. The problem is, when I use QoS on a channel and the number of messages exceeds the defined limit, the channel stops receiving messages.

Here is the code I’m using:

package rmqcode

import (
    "context"
    "fmt"
    "log"
    "sync"
    "time"

    amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Panicf("%s: %s", msg, err)
    }
}

func recWorker(worknum int, del *amqp.Delivery) {
    println(fmt.Sprintf("start %d", worknum))
    time.Sleep(time.Second * 2)
    println(fmt.Sprintf("end %d", worknum))
    del.Ack(true)
}

func Receiver() {
    conn, err := amqp.Dial("amqp://user:pass@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "hello", // name
        true,    // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.Qos(
        3,     // prefetch count
        0,     // prefetch size
        false, // global
    )
    failOnError(err, "Failed to set QoS")

    msg, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        var counter = 1
        for msgs := range msg {
            go recWorker(counter, &msgs)
            counter = counter + 1
        }
        fmt.Println("stopped")
        wg.Done()
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    wg.Wait()
}

func Send() {
    conn, err := amqp.Dial("amqp://user:pass@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "hello", // name
        true,    // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    body := "Hello World!"
    err = ch.PublishWithContext(ctx,
        "",     // exchange
        q.Name, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        })
    failOnError(err, "Failed to publish a message")
    log.Printf(" [x] Sent %s\n", body)
}

In my case, I’ve set the QoS to a specific number, but when the number of messages exceeds this number, the consumer doesn’t process further messages. I’ve tried running the code as shown in the post, but I still face the same problem. I suspect it’s related to how the messages are being acknowledged or possibly the prefetch count, but I’m not entirely sure.

Has anyone else encountered this issue when working with RabbitMQ and QoS? Any suggestions on how to resolve this or properly handle message queues without the channel freezing?

I am facing the same issue , i have raised this issue on github, Channel returned by consume function auto close when QoS defined & number of message in queue is > QoS value · Issue #296 · rabbitmq/amqp091-go · GitHub