NATS Logo by Example

Core Publish-Subcribe in Messaging

This example demonstrates the core NATS publish-subscribe behavior. This is the fundamental pattern that all other NATS patterns and higher-level APIs build upon. There are a few takeaways from this example:

  • Delivery is an at-most-once. For MQTT users, this is referred to as Quality of Service (QoS) 0.
  • There are two circumstances when a published message won’t be delivered to a subscriber:
    • The subscriber does not have an active connection to the server (i.e. the client is temporarily offline for some reason)
    • There is a network interruption where the message is ultimately dropped
  • Messages are published to subjects which can be one or more concrete tokens, e.g. greet.bob. Subscribers can utilize wildcards to show interest on a set of matching subjects.
CLI Go Python Deno Node Rust C# Java Ruby Elixir C
Jump to the output or the recording
$ nbe run messaging/pub-sub/java
View the source code or learn how to run this example yourself

Code

package example;


import io.nats.client.Connection;
import io.nats.client.Nats;
import io.nats.client.Dispatcher;


import java.io.IOException;
import java.nio.charset.StandardCharsets;


public class Main {
  public static void main(String[] args) {
    String natsURL = System.getenv("NATS_URL");
    if (natsURL == null) {
        natsURL = "nats://127.0.0.1:4222";
    }

Initialize a connection to the server. The connection is AutoCloseable on exit.

    try (Connection nc = Nats.connect(natsURL)) {

Prepare a simple message body and publish a message to a subject.

        byte[] messageBytes = "hello".getBytes(StandardCharsets.UTF_8);
        nc.publish("greet.joe", messageBytes);

Create a message dispatcher for handling messages in a separate thread and then subscribe to the target subject which leverages a wildcard greet.*.

        Dispatcher dispatcher = nc.createDispatcher((msg) -> {
            System.out.printf("%s on subject %s\n",
                new String(msg.getData(), StandardCharsets.UTF_8),
                msg.getSubject());
        });


        dispatcher.subscribe("greet.*");

Publish more messages that will be received by the subscription since they match the wildcard. Note the first message on greet.joe was not received because we were not subscribed when it was published

        nc.publish("greet.bob", messageBytes);
        nc.publish("greet.sue", messageBytes);
        nc.publish("greet.pam", messageBytes);

Sleep this thread a little so the dispatcher thread has time to receive all the messages before the program quits.

        Thread.sleep(200);


    } catch (InterruptedException | IOException e) {
      e.printStackTrace();
    }
  }
}

Output

Network ac43372d_default  Creating
Network ac43372d_default  Created
Container ac43372d-nats-1  Creating
Container ac43372d-nats-1  Created
Container ac43372d-nats-1  Starting
Container ac43372d-nats-1  Started
hello on subject greet.bob
hello on subject greet.sue
hello on subject greet.pam

Recording

Note, playback is half speed to make it a bit easier to follow.