Core Publish-Subcribe in Messaging
This example demonstrates the core NATS publish-subscribe behavior. This is the fundamental pattern that all other NATS patterns and higher-level APIs build upon. There are a few takeaways from this example:
- Delivery is an at-most-once. For MQTT users, this is referred to as Quality of Service (QoS) 0.
- There are two circumstances when a published message won’t be delivered to a subscriber:
- The subscriber does not have an active connection to the server (i.e. the client is temporarily offline for some reason)
- There is a network interruption where the message is ultimately dropped
- Messages are published to subjects which can be one or more concrete tokens, e.g.
greet.bob
. Subscribers can utilize wildcards to show interest on a set of matching subjects.
Code
use futures::StreamExt;
use std::{env, str::from_utf8};
#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {
Use the NATS_URL env variable if defined, otherwise fallback to the default.
let nats_url = env::var("NATS_URL")
.unwrap_or_else(|_| "nats://localhost:4222".to_string());
let client = async_nats::connect(nats_url).await?;
Publish a message to the subject greet.joe
.
client.publish("greet.joe".into(), "hello".into()).await?;
Subscriber
implements Rust iterator, so we can leverage
combinators like take()
to limit the messages intended
to be consumed for this interaction.
let mut subscription =
client.subscribe("greet.*".to_string()).await?.take(3);
Publish to three different subjects matching the wildcard.
for subject in ["greet.sue", "greet.bob", "greet.pam"] {
client.publish(subject.into(), "hello".into()).await?;
}
Notice that the first message received is greet.sue
and not
greet.joe
which was the first message published. This is because
core NATS provides at-most-once quality of service (QoS). Subscribers
must be connected showing interest in a subject for the server to
relay the message to the client.
while let Some(message) = subscription.next().await {
println!(
"{:?} received on {:?}",
from_utf8(&message.payload),
&message.subject
);
}
Ok(())
}
Output
Network 5408da7b_default Creating Network 5408da7b_default Created Container 5408da7b-nats-1 Creating Container 5408da7b-nats-1 Created Container 5408da7b-nats-1 Starting Container 5408da7b-nats-1 Started Ok("hello") received on "greet.sue" Ok("hello") received on "greet.bob" Ok("hello") received on "greet.pam"