Concurrent Message Processing in Messaging
By default, when a subscription is created, each message that is received it process sequentially. There can be multiple subscriptions setup in a [queue group][queue] in which case the NATS server will distribute messages to each member of the group.
However, even within a subscription, it may be desirable to handle messages concurrently. This example shows how this can be achieved in the clients that support it.
Code
use futures::stream::StreamExt;
use rand::Rng;
use std::{env, str::from_utf8, time::Duration};
#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {
Use the NATS_URL env variable if defined, otherwise fallback to the default.
let nats_url = env::var("NATS_URL")
.unwrap_or_else(|_| "nats://localhost:4222".to_string());
let client = async_nats::connect(nats_url).await?;
Subscriber
implements Rust iterator, so we can leverage
combinators like take()
to limit the messages intended
to be consumed for this interaction.
let subscription = client.subscribe("greet.*".to_string()).await?.take(50);
Publish set of messages, each with order identifier.
for i in 0..50 {
client
.publish("greet.joe".to_string(), format!("hello {}", i).into())
.await?;
}
Iterate over messages concurrently.
for_each_concurrent
allows us to not wait for time-consuming operation and receive next
message immediately.
25
is a limit for concurrent operations.
subscription
.for_each_concurrent(25, |message| async move {
Let’s simulate expensive operation.
let num = rand::thread_rng().gen_range(0..500);
tokio::time::sleep(Duration::from_millis(num)).await;
Print the result after sleep.
println!(
"received message: {:?}",
from_utf8(&message.payload).unwrap()
)
})
.await;
Ok(())
}
Output
Network 3b5be259_default Creating Network 3b5be259_default Created Container 3b5be259-nats-1 Creating Container 3b5be259-nats-1 Created Container 3b5be259-nats-1 Starting Container 3b5be259-nats-1 Started received message: "hello 10" received message: "hello 22" received message: "hello 23" received message: "hello 0" received message: "hello 21" received message: "hello 18" received message: "hello 24" received message: "hello 17" received message: "hello 1" received message: "hello 4" received message: "hello 5" received message: "hello 27" received message: "hello 33" received message: "hello 8" received message: "hello 15" received message: "hello 29" received message: "hello 3" received message: "hello 2" received message: "hello 37" received message: "hello 40" received message: "hello 16" received message: "hello 25" received message: "hello 32" received message: "hello 13" received message: "hello 31" received message: "hello 6" received message: "hello 39" received message: "hello 19" received message: "hello 7" received message: "hello 44" received message: "hello 45" received message: "hello 26" received message: "hello 14" received message: "hello 46" received message: "hello 11" received message: "hello 35" received message: "hello 20" received message: "hello 9" received message: "hello 12" received message: "hello 34" received message: "hello 28" received message: "hello 38" received message: "hello 30" received message: "hello 42" received message: "hello 49" received message: "hello 41" received message: "hello 36" received message: "hello 47" received message: "hello 43" received message: "hello 48"