Object Store

An Object Store bucket is an opinionated JetStream stream (OBJ_<bucket>, subjects $O.<bucket>.C.> for chunks and $O.<bucket>.M.> for per-object meta). It stores arbitrarily large binary objects by chunking them across the stream, with a rolled-up meta message recording each object's size, chunk count, and SHA-256 digest. Both put and get are fully streaming — neither materializes a whole object in memory.

The snippets below share these imports and helpers:

import cats.effect.IO
import fs2.Chunk
import fs2.io.file.{Files, Path}
import fs2.nats.client.NatsClient
import fs2.nats.objectstore.*

// A stand-in for wherever you send the bytes (a file, a socket, ...).
val sink: fs2.Pipe[IO, Byte, Unit] = _.map(_ => ())

Streaming put and get

put takes an fs2.Stream[F, Byte]; get returns an ObjectResult whose .data is an fs2.Stream[F, Byte]. The SHA-256 digest is verified once all chunks are read:

def objectStoreBasics(client: NatsClient[IO]): IO[Unit] =
  client.jetStream().use { js =>
    for
      os <- js.createObjectStore(ObjConfig(bucket = "assets"))

      // Stream bytes in (here from a file); nothing is buffered whole.
      info <- os.put(
                ObjectMeta("logo.png", maxChunkSize = 128 * 1024),
                Files[IO].readAll(Path("logo.png"))
              )

      // Stream bytes out; the digest is verified at end of stream.
      _ <- os.get("logo.png").flatMap {
             case Some(r) => r.data.through(sink).compile.drain
             case None    => IO.unit
           }

      // Convenience for small objects and files
      _   <- os.putBytes(ObjectMeta("readme.txt"), Chunk.array("hi".getBytes))
      txt <- os.getBytes("readme.txt")              // Option[Chunk[Byte]]
      _   <- os.putFile("backup.tar", Path("backup.tar"))
      _   <- os.getToFile("backup.tar", Path("restored.tar"))

      _   <- IO.println(s"stored ${info.size} bytes; readme present=${txt.isDefined}")
    yield ()
  }

Objects support links (addLink / addBucketLink, transparently resolved on get/info), metadata updates and rename (no re-upload), delete, list, watch (snapshot + EndOfData + live updates), and seal (make the bucket read-only):

def objectStoreAdmin(os: ObjectStore[IO]): IO[Unit] =
  for
    _    <- os.rename("old.txt", "new.txt")       // no re-upload
    list <- os.list.compile.toList                // live (non-deleted) objects
    _    <- os.watch.use {
              _.evalMap {
                case ObjectWatchEvent.Update(i)  => IO.println(s"updated ${i.name}")
                case ObjectWatchEvent.EndOfData  => IO.println("caught up")
              }.compile.drain
            }
    _    <- os.seal                                // make the bucket read-only
    _    <- IO.println(s"${list.size} live objects")
  yield ()

Reads of object meta use the JetStream Direct Get fast path when the bucket allows it; chunk reads use the gap-resetting ordered consumer, so a get recovers in order across a reconnect. Bucket management lives on the JetStream context: createObjectStore, objectStore, deleteObjectStore, objectStoreStatus, objectStoreNames.

See also Authentication & TLS to secure the connection.