JetStream

JetStream is obtained as a Resource over a connected client (requires a JetStream-enabled server, e.g. nats-server -js). It owns the publish window and supervised fibers, and releases them with the Resource.

The snippets below share these imports and helpers:

import cats.effect.IO
import cats.syntax.all.*
import scala.concurrent.duration.*
import fs2.Chunk
import fs2.nats.client.NatsClient
import fs2.nats.jetstream.*
import fs2.nats.jetstream.protocol.*

def process(payload: Chunk[Byte]): IO[Unit] = IO.println(s"got ${payload.size} bytes")

Streams and persistent publish

Create a stream, then publish to it with a PubAck. The optional Nats-Msg-Id (set via PublishOptions.msgId) enables server-side de-duplication:

def jetStreamBasics(client: NatsClient[IO]): IO[Unit] =
  client.jetStream().use { js =>
    for
      // Stream management
      _   <- js.addStream(StreamConfig(name = "ORDERS", subjects = List("orders.>")))

      // Persistent publish with PubAck (+ dedup via Nats-Msg-Id)
      ack <- js.publish(
               "orders.new",
               Chunk.array("order #1".getBytes),
               opts = PublishOptions(msgId = Some("order-1"))
             )
      _   <- IO.println(s"stored seq=${ack.seq} duplicate=${ack.duplicate}")

      // Pull consumer: create + fetch + ack
      c    <- js.createConsumer(
                "ORDERS",
                ConsumerConfig(durable = Some("workers"), filterSubject = Some("orders.new"))
              )
      msgs <- c.fetch(batch = 10, maxWait = 2.seconds)
      _    <- msgs.traverse_(m => process(m.payload) *> m.ack)
    yield ()
  }

Continuous pull consumption

consume owns a background pull loop as part of the Resource; it re-issues its request on a cadence so it resumes after a dropped connection:

def consumeLoop(c: JsConsumer[IO]): IO[Unit] =
  c.consume().use { stream =>
    stream.evalMap(m => process(m.payload) *> m.ack).compile.drain
  }

Push consumption

Durable or ephemeral, with an optional queue group. Idle heartbeats are filtered and flow-control requests answered automatically; ephemeral consumers are deleted on release:

def pushConsume(client: NatsClient[IO]): IO[Unit] =
  client.jetStream().use { js =>
    js.subscribePush(
        "ORDERS",
        ConsumerConfig(durable = Some("push-workers"), deliverGroup = Some("workers"))
      )
      .use(_.evalMap(m => process(m.payload) *> m.ack).compile.drain)
  }

Ack semantics

Each delivered JsMessage carries the acks:

Finalizing acks take effect once; inProgress is repeatable.

Reconnect: push and pull subscriptions ride the client's automatic subscription replay on reconnect; the pull consume loop additionally re-issues its request on a cadence so it resumes after a dropped connection.

Built on JetStream: Key-Value and Object Store.