Getting Started

This page covers the core pub/sub surface: connecting, publishing, subscribing, wildcards, queue groups, connection events, and request/reply.

Prerequisites

Start a NATS server:

docker run -p 4222:4222 nats:latest

The snippets below share these imports and helpers:

import cats.effect.IO
import com.comcast.ip4s.{Host, Port}
import fs2.Chunk
import fs2.nats.client.{ClientConfig, ClientEvent, NatsClient}
import fs2.nats.protocol.Headers
import fs2.nats.subscriptions.NatsMessage

val host = Host.fromString("localhost").get
val port = Port.fromInt(4222).get

def processWork(msg: NatsMessage): IO[Unit] = IO.println(s"processing ${msg.subject}")

Basic usage

NatsClient.connect hands you the client as a Resource; a subscription is in turn a Resource over an fs2.Stream[F, NatsMessage]. Nothing connects or subscribes until the Resource is used.

val config = ClientConfig(host = host, port = port)

val program: IO[Unit] =
  NatsClient.connect[IO](config).use { client =>
    client.subscribe("hello.world").use { messages =>
      for
        _   <- client.publish("hello.world", Chunk.array("Hello, NATS!".getBytes))
        msg <- messages.take(1).compile.lastOrError
        _   <- IO.println(s"Received: ${msg.payloadAsString}")
      yield ()
    }
  }

Publishing with headers

NATS 2.2+ headers are first class. Build a Headers value and pass it to publish:

def publishWithHeaders(client: NatsClient[IO]): IO[Unit] =
  val headers = Headers(
    "X-Request-Id" -> "abc123",
    "X-Timestamp"  -> System.currentTimeMillis().toString
  )
  client.publish("events.created", Chunk.array("""{"id": 1}""".getBytes), headers)

Wildcard subscriptions

* matches a single token; > matches one or more trailing tokens:

// Subscribe to all events under events.*
def singleToken(client: NatsClient[IO]): IO[Unit] =
  client.subscribe("events.*").use { messages =>
    messages.evalMap(msg => IO.println(s"${msg.subject}: ${msg.payloadAsString}")).compile.drain
  }

// Subscribe to events.a, events.a.b, events.a.b.c, ...
def multiToken(client: NatsClient[IO]): IO[Unit] =
  client.subscribe("events.>").use(_.compile.drain)

Queue groups (load balancing)

Subscribers sharing a queue group split the messages between them:

def worker(client: NatsClient[IO]): IO[Unit] =
  client.subscribe("work.queue", queueGroup = Some("workers")).use { messages =>
    messages.evalMap(processWork).compile.drain
  }

Connection events

client.events is a Stream[F, ClientEvent] reporting connection lifecycle, slow consumers, and protocol errors:

def watchEvents(client: NatsClient[IO]): IO[Unit] =
  client.events.evalMap {
    case ClientEvent.Connected(info) =>
      IO.println(s"Connected to ${info.serverId}")
    case ClientEvent.Disconnected(reason, willReconnect) =>
      IO.println(s"Disconnected: $reason, reconnecting: $willReconnect")
    case ClientEvent.Reconnected(info, attempt) =>
      IO.println(s"Reconnected to ${info.serverId} after $attempt attempts")
    case ClientEvent.SlowConsumer(sid, subject, dropped) =>
      IO.println(s"Slow consumer on $subject, dropped $dropped messages")
    case other =>
      IO.println(s"Event: $other")
  }.compile.drain

Request/Reply

request publishes to a shared response inbox and awaits a single reply. It fails fast with NatsError.NoResponders if nobody is listening (503), or NatsError.Timeout if no reply arrives within the timeout:

def echo(client: NatsClient[IO]): IO[NatsMessage] =
  client.request("service.echo", Chunk.array("ping".getBytes))

Next up: JetStream for persistence and consumers.