Core Publish-Subcribe in Messaging
This example demonstrates the core NATS publish-subscribe behavior. This is the fundamental pattern that all other NATS patterns and higher-level APIs build upon. There are a few takeaways from this example:
- Delivery is an at-most-once. For MQTT users, this is referred to as Quality of Service (QoS) 0.
- There are two circumstances when a published message won’t be delivered to a subscriber:
- The subscriber does not have an active connection to the server (i.e. the client is temporarily offline for some reason)
- There is a network interruption where the message is ultimately dropped
- Messages are published to subjects which can be one or more concrete tokens, e.g.
greet.bob
. Subscribers can utilize wildcards to show interest on a set of matching subjects.
Code
package main
import (
"fmt"
"os"
"time"
"github.com/nats-io/nats.go"
)
func main() {
Use the env varibale if running in the container, otherwise use the default.
url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}
Create an unauthenticated connection to NATS.
nc, _ := nats.Connect(url)
Drain is a safe way to to ensure all buffered messages that were published are sent and all buffered messages received on a subscription are processed being closing the connection.
defer nc.Drain()
Messages are published to subjects. Although there are no subscribers, this will be published successfully.
nc.Publish("greet.joe", []byte("hello"))
Let’s create a subscription on the greet.* wildcard.
sub, _ := nc.SubscribeSync("greet.*")
For a synchronous subscription, we need to fetch the next message. However.. since the publish occured before the subscription was established, this is going to timeout.
msg, _ := sub.NextMsg(10 * time.Millisecond)
fmt.Println("subscribed after a publish...")
fmt.Printf("msg is nil? %v\n", msg == nil)
Publish a couple messages.
nc.Publish("greet.joe", []byte("hello"))
nc.Publish("greet.pam", []byte("hello"))
Since the subscription is established, the published messages will immediately be broadcasted to all subscriptions. They will land in their buffer for subsequent NextMsg calls.
msg, _ = sub.NextMsg(10 * time.Millisecond)
fmt.Printf("msg data: %q on subject %q\n", string(msg.Data), msg.Subject)
msg, _ = sub.NextMsg(10 * time.Millisecond)
fmt.Printf("msg data: %q on subject %q\n", string(msg.Data), msg.Subject)
One more for good measures..
nc.Publish("greet.bob", []byte("hello"))
msg, _ = sub.NextMsg(10 * time.Millisecond)
fmt.Printf("msg data: %q on subject %q\n", string(msg.Data), msg.Subject)
}
Output
Network 6b32c814_default Creating Network 6b32c814_default Created Container 6b32c814-nats-1 Creating Container 6b32c814-nats-1 Created Container 6b32c814-nats-1 Starting Container 6b32c814-nats-1 Started subscribed after a publish... msg is nil? true msg data: "hello" on subject "greet.joe" msg data: "hello" on subject "greet.pam" msg data: "hello" on subject "greet.bob"