NATS Logo by Example

Queue Push Consumers in JetStream

A queue push consumer is analogous to a core NATS queue group, but designed to work with streams. Unlike a standard push consumer which only supports a single bound subscription at any time, a queue-based one supports multiple subscriptions bound to the consumer. Messages from the consumer will be distributed randomly among active subscribers which can freely come and go.

In this example, we will demonstrate how to create a durable queue push consumer and how to bind subscriptions to receive and process messages.

Note that as of NATS server v2.8.4, ephemeral queue push consumers are not supported. This means that the server does not currently keep track of these and will auto-cleanup if no active subscriptions are bound. You can, of course, create a durable and then delete once you are done with it, but if the deletion fails to happen (program crashes), you will need to be sure to check when the starts up again.

It is recommended to review the standard push consumer example in order to understand the general concepts. The queue-based variant is a small extension to this type of consumer.

CLI Go Python Deno Node Rust C# Java Ruby Elixir C
Jump to the output or the recording
$ nbe run jetstream/queue-push-consumer/go
View the source code or learn how to run this example yourself

Code

package main


import (
	"fmt"
	"os"
	"sync"
	"time"


	"github.com/nats-io/nats.go"
)


func main() {

Use the env variable if running in the container, otherwise use the default.

	url := os.Getenv("NATS_URL")
	if url == "" {
		url = nats.DefaultURL
	}

Create an unauthenticated connection to NATS.

	nc, _ := nats.Connect(url)
	defer nc.Drain()

Access the JetStreamContext for managing streams and consumers as well as for publishing and subscription convenience methods.

	js, _ := nc.JetStream()

Declare a simple limits-based stream and populate the stream with a few messages.

	streamName := "EVENTS"


	js.AddStream(&nats.StreamConfig{
		Name:     streamName,
		Subjects: []string{"events.>"},
	})

Durable (implicit)

Like the standard push consumer, the JetStreamContext provides a simple way to create an queue push consumer. The only additional argument is the “group name”. If nats.Durable is not passed, the group name is used as the durable name as well.

	fmt.Println("# Durable (implicit)")
	sub1, _ := js.QueueSubscribeSync("events.>", "event-processor", nats.AckExplicit())

If we inspect the consumer info, we will notice a property that was not defined for the non-queue push consumer. The DeliverGroup is the unique name of the group of subscribers. Internally, this corresponds to a core NATS queue group name which we will see below.

	info, _ := js.ConsumerInfo(streamName, "event-processor")
	fmt.Printf("deliver group: %q\n", info.Config.DeliverGroup)

Using the same helper method, we can create another subscription in the group. This method implicitly checks for whether the consumer has been created and binds to it based on the subject and group name.

	sub2, _ := js.QueueSubscribeSync("events.>", "event-processor", nats.AckExplicit())

As noted above, a queue push consumer relies on a core NATS queue group for distributing messages to active members. As a result, we can bind a subsription by using the DeliverSubject and the DeliverGroup Since messages are publishing to a dedicated subject and is part of a group, we can also create a core NATS subscription to join the group. As a reminder, the DeliverSubject is randomly generated by default, but this can be set explicitly in the consumer config if desired.

	sub3, _ := nc.QueueSubscribeSync(info.Config.DeliverSubject, info.Config.DeliverGroup)
	fmt.Printf("deliver subject: %q\n", info.Config.DeliverSubject)

Now we can publish some messages to the stream to observe how they are distributed to the subscribers.

	js.Publish("events.1", nil)
	js.Publish("events.2", nil)
	js.Publish("events.3", nil)
	js.Publish("events.4", nil)
	js.Publish("events.5", nil)
	js.Publish("events.6", nil)

As noted in the push consumer example, subscriptions enqueue messages proactively. When there are a group of subscriptions, each will receive a different subset of the messages. When calling NextMsg this means, messages can be processed out of order. There is no correlation with message order and subscription creation order 😉. In fact, it is possible that not all subscriptions will necessarily get a message.

	msg, _ := sub1.NextMsg(time.Second)
	if msg != nil {
		fmt.Printf("sub1: received message %q\n", msg.Subject)
		msg.Ack()
	} else {
		fmt.Println("sub1: receive timeout")
	}


	msg, _ = sub2.NextMsg(time.Second)
	if msg != nil {
		fmt.Printf("sub2: received message %q\n", msg.Subject)
		msg.Ack()
	} else {
		fmt.Println("sub2: receive timeout")
	}


	msg, _ = sub3.NextMsg(time.Second)
	if msg != nil {
		fmt.Printf("sub3: received message %q\n", msg.Subject)
		msg.Ack()
	} else {
		fmt.Println("sub3: receive timeout")
	}

Since we created this consumer using the helper method, when we unsubscribe (or call Drain), the consumer will be deleted.

	sub1.Unsubscribe()
	sub2.Unsubscribe()
	sub3.Unsubscribe()

Durable (explicit)

To create a (safe) durable consumer, use the AddConsumer method. Although it may seem redundant, a durable name and the deliver group name must be defined. This is simply because the durable name is used for all consumer types, while the deliver group is exclusive to the queue push consumer. In this example, the same name is used as convention which is what the helper method above did as well.

	fmt.Println("\n# Durable (explicit)")


	js.AddConsumer(streamName, &nats.ConsumerConfig{
		Durable:        "event-processor",
		DeliverSubject: "my-subject",
		DeliverGroup:   "event-processor",
		AckPolicy:      nats.AckExplicitPolicy,
	})
	defer js.DeleteConsumer(streamName, "event-processor")

Wait for all 6 messages to be received before exiting.

	wg := &sync.WaitGroup{}
	wg.Add(6)

For this part, we will use async core NATS queue subscriptions. Since core NATS subscriptions are JetStream-unaware, we must call msg.Ack explicitly to notify the server that the message has been processed. For a js.QueueSubscribe handler, by default, messages will be auto-acked unless the nats.ManualAck subscription option is supplied.

	sub1, _ = nc.QueueSubscribe("my-subject", "event-processor", func(msg *nats.Msg) {
		fmt.Printf("sub1: received message %q\n", msg.Subject)
		msg.Ack()
		wg.Done()
	})
	sub2, _ = nc.QueueSubscribe("my-subject", "event-processor", func(msg *nats.Msg) {
		fmt.Printf("sub2: received message %q\n", msg.Subject)
		msg.Ack()
		wg.Done()
	})
	sub3, _ = nc.QueueSubscribe("my-subject", "event-processor", func(msg *nats.Msg) {
		fmt.Printf("sub3: received message %q\n", msg.Subject)
		msg.Ack()
		wg.Done()
	})


	wg.Wait()
}

Output

Network d7077068_default  Creating
Network d7077068_default  Created
Container d7077068-nats-1  Creating
Container d7077068-nats-1  Created
Container d7077068-nats-1  Starting
Container d7077068-nats-1  Started
# Durable (implicit)
deliver group: "event-processor"
deliver subject: "_INBOX.CYOssaH1azfwRG0Z9sLLFJ"
sub1: received message "events.2"
sub2: received message "events.1"
sub3: received message "events.3"

# Durable (explicit)
sub2: received message "events.4"
sub1: received message "events.1"
sub1: received message "events.2"
sub1: received message "events.3"
sub1: received message "events.5"
sub1: received message "events.6"

Recording

Note, playback is half speed to make it a bit easier to follow.