Queue Push Consumers in JetStream
A queue push consumer is analogous to a core NATS queue group, but designed to work with streams. Unlike a standard push consumer which only supports a single bound subscription at any time, a queue-based one supports multiple subscriptions bound to the consumer. Messages from the consumer will be distributed randomly among active subscribers which can freely come and go.
In this example, we will demonstrate how to create a durable queue push consumer and how to bind subscriptions to receive and process messages.
Note that as of NATS server v2.8.4, ephemeral queue push consumers are not supported. This means that the server does not currently keep track of these and will auto-cleanup if no active subscriptions are bound. You can, of course, create a durable and then delete once you are done with it, but if the deletion fails to happen (program crashes), you will need to be sure to check when the starts up again.
It is recommended to review the standard push consumer example in order to understand the general concepts. The queue-based variant is a small extension to this type of consumer.
$ nbe run jetstream/queue-push-consumer/goView the source code or learn how to run this example yourself
Code
package main
import (
"fmt"
"os"
"sync"
"time"
"github.com/nats-io/nats.go"
)
func main() {
Use the env variable if running in the container, otherwise use the default.
url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}
Create an unauthenticated connection to NATS.
nc, _ := nats.Connect(url)
defer nc.Drain()
Access the JetStreamContext for managing streams and consumers as well as for publishing and subscription convenience methods.
js, _ := nc.JetStream()
Declare a simple limits-based stream and populate the stream with a few messages.
streamName := "EVENTS"
js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{"events.>"},
})
Durable (implicit)
Like the standard push consumer, the JetStreamContext
provides
a simple way to create an queue push consumer. The only additional
argument is the “group name”. If nats.Durable
is not passed, the
group name is used as the durable name as well.
fmt.Println("# Durable (implicit)")
sub1, _ := js.QueueSubscribeSync("events.>", "event-processor", nats.AckExplicit())
If we inspect the consumer info, we will notice a property that
was not defined for the non-queue push consumer. The DeliverGroup
is the unique name of the group of subscribers. Internally, this
corresponds to a core NATS queue group name which we will see
below.
info, _ := js.ConsumerInfo(streamName, "event-processor")
fmt.Printf("deliver group: %q\n", info.Config.DeliverGroup)
Using the same helper method, we can create another subscription in the group. This method implicitly checks for whether the consumer has been created and binds to it based on the subject and group name.
sub2, _ := js.QueueSubscribeSync("events.>", "event-processor", nats.AckExplicit())
As noted above, a queue push consumer relies on a core NATS
queue group for distributing messages to active members. As
a result, we can bind a subsription by using the DeliverSubject
and the DeliverGroup
Since messages are publishing to a dedicated subject and is
part of a group, we can also create a core NATS subscription
to join the group. As a reminder, the DeliverSubject
is
randomly generated by default, but this can be set explicitly
in the consumer config if desired.
sub3, _ := nc.QueueSubscribeSync(info.Config.DeliverSubject, info.Config.DeliverGroup)
fmt.Printf("deliver subject: %q\n", info.Config.DeliverSubject)
Now we can publish some messages to the stream to observe how they are distributed to the subscribers.
js.Publish("events.1", nil)
js.Publish("events.2", nil)
js.Publish("events.3", nil)
js.Publish("events.4", nil)
js.Publish("events.5", nil)
js.Publish("events.6", nil)
As noted in the push consumer example, subscriptions
enqueue messages proactively. When there are a group of
subscriptions, each will receive a different subset of the messages.
When calling NextMsg
this means, messages can be processed out
of order. There is no correlation with message order and
subscription creation order 😉. In fact, it is possible that
not all subscriptions will necessarily get a message.
msg, _ := sub1.NextMsg(time.Second)
if msg != nil {
fmt.Printf("sub1: received message %q\n", msg.Subject)
msg.Ack()
} else {
fmt.Println("sub1: receive timeout")
}
msg, _ = sub2.NextMsg(time.Second)
if msg != nil {
fmt.Printf("sub2: received message %q\n", msg.Subject)
msg.Ack()
} else {
fmt.Println("sub2: receive timeout")
}
msg, _ = sub3.NextMsg(time.Second)
if msg != nil {
fmt.Printf("sub3: received message %q\n", msg.Subject)
msg.Ack()
} else {
fmt.Println("sub3: receive timeout")
}
Since we created this consumer using the helper method, when we unsubscribe
(or call Drain
), the consumer will be deleted.
sub1.Unsubscribe()
sub2.Unsubscribe()
sub3.Unsubscribe()
Durable (explicit)
To create a (safe) durable consumer, use the AddConsumer
method. Although it may seem redundant, a durable name and
the deliver group name must be defined. This is simply because
the durable name is used for all consumer types, while the
deliver group is exclusive to the queue push consumer. In this
example, the same name is used as convention which is what the helper
method above did as well.
fmt.Println("\n# Durable (explicit)")
js.AddConsumer(streamName, &nats.ConsumerConfig{
Durable: "event-processor",
DeliverSubject: "my-subject",
DeliverGroup: "event-processor",
AckPolicy: nats.AckExplicitPolicy,
})
defer js.DeleteConsumer(streamName, "event-processor")
Wait for all 6 messages to be received before exiting.
wg := &sync.WaitGroup{}
wg.Add(6)
For this part, we will use async core NATS queue subscriptions. Since
core NATS subscriptions are JetStream-unaware, we must call msg.Ack
explicitly to notify the server that the message has been processed.
For a js.QueueSubscribe
handler, by default, messages will be auto-acked
unless the nats.ManualAck
subscription option is supplied.
sub1, _ = nc.QueueSubscribe("my-subject", "event-processor", func(msg *nats.Msg) {
fmt.Printf("sub1: received message %q\n", msg.Subject)
msg.Ack()
wg.Done()
})
sub2, _ = nc.QueueSubscribe("my-subject", "event-processor", func(msg *nats.Msg) {
fmt.Printf("sub2: received message %q\n", msg.Subject)
msg.Ack()
wg.Done()
})
sub3, _ = nc.QueueSubscribe("my-subject", "event-processor", func(msg *nats.Msg) {
fmt.Printf("sub3: received message %q\n", msg.Subject)
msg.Ack()
wg.Done()
})
wg.Wait()
}
Output
Network d7077068_default Creating Network d7077068_default Created Container d7077068-nats-1 Creating Container d7077068-nats-1 Created Container d7077068-nats-1 Starting Container d7077068-nats-1 Started # Durable (implicit) deliver group: "event-processor" deliver subject: "_INBOX.CYOssaH1azfwRG0Z9sLLFJ" sub1: received message "events.2" sub2: received message "events.1" sub3: received message "events.3" # Durable (explicit) sub2: received message "events.4" sub1: received message "events.1" sub1: received message "events.2" sub1: received message "events.3" sub1: received message "events.5" sub1: received message "events.6"