Core Publish-Subcribe in Messaging
This example demonstrates the core NATS publish-subscribe behavior. This is the fundamental pattern that all other NATS patterns and higher-level APIs build upon. There are a few takeaways from this example:
- Delivery is an at-most-once. For MQTT users, this is referred to as Quality of Service (QoS) 0.
- There are two circumstances when a published message won’t be delivered to a subscriber:
- The subscriber does not have an active connection to the server (i.e. the client is temporarily offline for some reason)
- There is a network interruption where the message is ultimately dropped
- Messages are published to subjects which can be one or more concrete tokens, e.g.
greet.bob
. Subscribers can utilize wildcards to show interest on a set of matching subjects.
Code
import {connect, StringCodec} from "nats";
Get the passed NATS_URL or fallback to the default. This can be a comma-separated string.
const servers = process.env.NATS_URL || "nats://localhost:4222";
Create a client connection to an available NATS server.
const nc = await connect({
servers: servers.split(","),
});
NATS message payloads are byte arrays, so we need to have a codec to serialize and deserialize payloads in order to work with them. Another built-in codec is JSONCodec or you can implement your own.
const sc = StringCodec();
To publish a message, simply provide the subject of the message
and encode the message payload. NATS subjects are hierarchical using
periods as token delimiters. greet
and joe
are two distinct tokens.
nc.publish("greet.bob", sc.encode("hello"));
Now we are going to create a subscription and utilize a wildcard on
the second token. The effect is that this subscription shows interest
in all messages published to a subject with two tokens where the first
is greet
.
let sub = nc.subscribe("greet.*", {max: 3});
const done = (async () => {
for await (const msg of sub) {
console.log(`${sc.decode(msg.data)} on subject ${msg.subject}`);
}
})()
Let’s publish three more messages which will result in the messages being forwarded to the local subscription we have.
nc.publish("greet.joe", sc.encode("hello"));
nc.publish("greet.pam", sc.encode("hello"));
nc.publish("greet.sue", sc.encode("hello"));
This will wait until the above async subscription handler finishes
processing the three messages. Note that the first message to
greet.bob
was not printed. This is because the subscription was
created after the publish. Core NATS provides at-most-once quality
of service (QoS) for active subscriptions.
await done;
Finally we drain the connection which waits for any pending messages (published or in a subscription) to be flushed.
await nc.drain();
Output
Network 6a820af9_default Creating Network 6a820af9_default Created Container 6a820af9-nats-1 Creating Container 6a820af9-nats-1 Created Container 6a820af9-nats-1 Starting Container 6a820af9-nats-1 Started hello on subject greet.joe hello on subject greet.pam hello on subject greet.sue