Multi-Stream Consumption in JetStream
There may be use cases where a fan-in of messages across streams
may be desired. One way to achieve this is to create a push consumer
per stream and specify the same DeliverSubject
and, optionally,
DeliverGroup
. This will result in each consumer delivering messages
to clients subscribed to the subject and/or part of a queue group.
This example will demonstrate how to configure the consumers and subscription to achieve this fan-in consumption.
$ nbe run jetstream/multi-stream-consumption/goView the source code or learn how to run this example yourself
Code
package main
import (
"fmt"
"os"
"time"
"github.com/nats-io/nats.go"
)
func main() {
natsURL := os.Getenv("NATS_URL")
if natsURL == "" {
natsURL = nats.DefaultURL
}
nc, _ := nats.Connect(natsURL)
js, _ := nc.JetStream()
Create a stream for each region.
js.AddStream(&nats.StreamConfig{
Name: "EVENTS-EU",
Subjects: []string{"events.eu.>"},
})
js.AddStream(&nats.StreamConfig{
Name: "EVENTS-US",
Subjects: []string{"events.us.>"},
})
Create a consumer for each stream. Both publish to the same deliver subject. This is a straightforward way to do this in a single account. It is recommended a user is created with specific permissions to subscribe to this subject.
js.AddConsumer("EVENTS-EU", &nats.ConsumerConfig{
Durable: "processor",
DeliverSubject: "push.events",
DeliverGroup: "processor",
AckPolicy: nats.AckExplicitPolicy,
})
js.AddConsumer("EVENTS-US", &nats.ConsumerConfig{
Durable: "processor",
DeliverSubject: "push.events",
DeliverGroup: "processor",
AckPolicy: nats.AckExplicitPolicy,
})
Publish messages to each stream.
js.Publish("events.eu.page_loaded", nil)
js.Publish("events.eu.input_focused", nil)
js.Publish("events.us.page_loaded", nil)
js.Publish("events.us.mouse_clicked", nil)
js.Publish("events.eu.mouse_clicked", nil)
js.Publish("events.us.input_focused", nil)
Subscribe to the deliver subject with core NATS subscription. Observe that messages from both streams are being received and can be ack’ed.
sub, _ := nc.QueueSubscribeSync("push.events", "processor")
defer sub.Drain()
for {
msg, err := sub.NextMsg(time.Second)
if err == nats.ErrTimeout {
break
}
fmt.Println(msg.Subject)
msg.Ack()
}
Confirm the consumer state is updated.
info1, _ := js.ConsumerInfo("EVENTS-EU", "processor")
fmt.Printf("eu: last delivered: %d, num pending: %d\n", info1.Delivered.Stream, info1.NumPending)
info2, _ := js.ConsumerInfo("EVENTS-US", "processor")
fmt.Printf("us: last delivered: %d, num pending: %d\n", info2.Delivered.Stream, info2.NumPending)
}
Output
Network 59bfde6d_default Creating Network 59bfde6d_default Created Container 59bfde6d-nats-1 Creating Container 59bfde6d-nats-1 Created Container 59bfde6d-nats-1 Starting Container 59bfde6d-nats-1 Started events.eu.page_loaded events.eu.input_focused events.us.page_loaded events.eu.mouse_clicked events.us.mouse_clicked events.us.input_focused eu: last delivered: 3, num pending: 0 us: last delivered: 3, num pending: 0