R

Replication & Sync

A CRDT primitive on its own guarantees that the same set of operations converges. Replication is the layer that makes sure every replica eventually holds that same set — despite messages arriving out of order, twice, or after a long offline gap. This package does it without a central server, a global lock, or full-state diffing: each replica keeps an append-only op log keyed by a version vector, and the two sides exchange only the operations the other is missing.

The pieces fit together in one direction. OpLog stores ops and tracks a VersionVector; Replica wraps a log plus a Lamport clock and a causal buffer, integrating local and remote ops into your domain state; and the sync helpers turn version vectors and op batches into bytes for any transport.

The convergence model

Every operation carries a globally-unique OpId — a per-site Lamport clock value tagged with the site that produced it ({ site, clock }). Two facts make replication work, and both flow from that id:

  • Identity ⇒ idempotence. Because an op's id is stable, a replica can tell whether it has already seen an op and apply it at most once. Delivering the same op twice is a no-op, so duplicate or replayed messages are harmless.
  • Determinism ⇒ commutativity. Concurrent ops are resolved by one shared tie-break — compareOpId (higher clock wins, site id breaks ties). Since every replica agrees on it, the order ops arrive in doesn't change the final state.

Replication therefore reduces to a set-reconciliation problem: get both replicas to the same set of ops. Convergence of the resulting state is the primitive's job; getting the ops there efficiently is this layer's.

Version vectors: "what have you seen?"

A VersionVector is the compact summary of everything a replica has observed — a map from site id to the highest clock seen for that site. It relies on one assumption: each site emits dense clocks (1, 2, 3, …), with no gaps. That's what lets a single number per site stand in for a whole set: if a replica has seen a@5, it has necessarily seen a@1…a@4 too. So has(id) is just get(id.site) >= id.clock — an O(1) check, no per-op bookkeeping.

Two operations follow directly. Dedup: an incoming op whose id the vector already covers can be ignored. Delta: given a remote vector, the set of ops the remote lacks is exactly those whose id the vector does not cover.

The op log

OpLog is an append-only list of operations paired with a version vector. It is deliberately domain-agnostic: the only field it reads is id (the HasOpId constraint), so the same log stores RGA inserts, LWW writes, mark spans, or anything else you give it.

ts
import { OpLog } from '@robonen/crdt';

// The op log only ever reads `id` — the rest of the op is your domain payload.
interface CharOp {
  id: { site: string; clock: number };
  originLeft: { site: string; clock: number } | null;
  value: string;
}

const log = new OpLog<CharOp>();

log.append(op);          // true if new, false if already seen (dedup by id)
log.has(op.id);          // version vector lookup, not a linear scan
log.version;             // VersionVector — the highest clock seen per site
log.all();               // every op, in append order
log.delta(remoteVector); // ops the remote (described by its vector) lacks

append consults the vector first and returns false if the op is a duplicate, so the log never stores the same id twice. delta(remote) walks the log once and keeps every op the remote vector hasn't covered — this is the heart of "exchange only the delta".

ts
// A asks B: "here's everything I've seen" (a state vector).
const aWants = a.replica.version;

// B answers with exactly the ops A is missing — nothing more.
const patch = b.replica.delta(aWants); // OpLog.delta filters by the vector

// A integrates them; ids it already has are silently dropped.
a.replica.receive(patch);

The replica

Replica ties everything together. It owns a LamportClock, an OpLog, and a pending buffer, and you give it a single handler — integrate(op) — that applies an op to your domain state and returns false when the op's causal dependencies aren't present yet.

Producing local ops

Call nextId() to tick the clock and mint a fresh, causally-later OpId, build your op around it, then hand it to commitLocal(op). That logs it, integrates it into local state, and notifies onUpdate listeners with origin 'local'. Because nextId advances a Lamport clock that also tracks observed remote ops, locally-generated ids are always ordered after everything the replica has seen.

Receiving remote ops

receive(ops) is the inbound path. For each op it advances the clock past the remote id (clock.observe), skips anything already logged or already buffered, then drains the buffer — integrating whatever is now causally ready, retrying until no further progress is possible. It returns the ops it actually applied (in apply order) and notifies listeners with origin 'remote'.

Computing a delta

delta(remoteVector) forwards to the log: the ops this replica holds that the remote, described by its version, has not seen. The whole round-trip is two deltas — one per direction.

The canonical round-trip

Here is the README's converging-string example expanded end to end. Two replicas type concurrently, then each side sends the other exactly the ops it lacks. After both deltas, they hold the identical op set and therefore the identical string.

ts
import { Replica, Rga } from '@robonen/crdt';
import type { OpId } from '@robonen/crdt';

interface CharOp {
  id: OpId;
  originLeft: OpId | null;
  value: string;
}

// Each site owns an RGA (the sequence state) behind a Replica
// (clock + op log + causal buffer + delta sync).
function makeReplica(site: string) {
  const rga = new Rga<string>();
  const replica = new Replica<CharOp>(
    { integrate: op => rga.integrateInsert(op.id, op.value, op.originLeft) },
    site,
  );
  return { rga, replica };
}

function type(peer: ReturnType<typeof makeReplica>, text: string): void {
  let left: OpId | null = null;
  for (const ch of text) {
    const id = peer.replica.nextId();              // tick the Lamport clock
    peer.replica.commitLocal({ id, originLeft: left, value: ch });
    left = id;
  }
}

const a = makeReplica('a');
const b = makeReplica('b');

// Concurrent, independent edits — neither has seen the other.
type(a, 'Hi');
type(b, 'Yo');

// Exchange ONLY the delta each side is missing, in both directions.
b.replica.receive(a.replica.delta(b.replica.version));
a.replica.receive(b.replica.delta(a.replica.version));

a.rga.toArray().join('') === b.rga.toArray().join(''); // true — converged
a.rga.length; // 4

Note the asymmetry that makes this efficient: a.replica.delta(b.replica.version) is computed against B's vector, so it returns only what B is missing — not A's entire history. On a long document this is the difference between sending two characters and re-sending the whole file.

Why the order of the two deltas is irrelevant

You could swap the two receive lines, run them repeatedly, or interleave them with more edits — the result is the same. Each side only ever adds ops it hasn't seen, and compareOpId places each op in its deterministic position regardless of arrival order. That is convergence, and the property tests assert it across randomized schedules.

Causal buffering

Some ops can't be applied the instant they arrive. An RGA insert references an originLeft — the element it goes after — and a delete references the element it tombstones. If that target hasn't been integrated yet (a later op overtook an earlier one in transit), the insert has nowhere to anchor.

The handler signals this by returning false from integrate: Rga's integrateInsert returns false when its origin is absent, and integrateDelete returns false when its target is unknown. Replica.receive treats a false as "not ready yet": it keeps the op in a pending buffer and re-runs the buffer every time new ops land, until either the op integrates or its dependency finally arrives. Nothing is lost; nothing is applied prematurely.

ts
const a = makeReplica('a');
type(a, 'ab'); // two ops; the 2nd inserts after (depends on) the 1st

const b = makeReplica('b');
const [op1, op2] = a.replica.delta(b.replica.version);

// Deliver the DEPENDENT op first. Its origin (op1) isn't present,
// so integrate() returns false and the replica buffers it.
b.replica.receive([op2]);
b.rga.toArray().join(''); // '' — nothing applied yet

// Now deliver the dependency. op1 integrates, which unblocks op2;
// drain() loops until no further progress is possible.
b.replica.receive([op1]);
b.rga.toArray().join(''); // 'ab'

Internally the drain loop sweeps the buffer repeatedly: each successful integration may unblock another buffered op, so it keeps looping while it makes progress. This is why a single receive of a batch delivered in any order still settles to the right state — the buffer absorbs the disorder.

Transport-agnostic wire encoding

The sync module is the only part that touches bytes, and it stays small on purpose. There are two things to put on the wire — a version vector (the "what do you have?" handshake) and a batch of ops (the delta or a full snapshot) — and a helper for each direction:

  • encodeStateVector / decodeStateVector — a VersionVectorUint8Array.
  • encodeOps / decodeOps — an op batch (the delta or a full snapshot) ⇄ Uint8Array.
  • encodeJson / decodeJson — the lower-level pair the others build on.

The v1 format is JSON encoded to bytes — simple and debuggable. A compact varint format is a later optimization that changes the bytes, not the API, so code written against these functions keeps working. Because the result is just a Uint8Array, the transport is entirely up to you: WebSocket, HTTP, BroadcastChannel, a file on disk.

ts
import {
  encodeStateVector, decodeStateVector,
  encodeOps, decodeOps,
} from '@robonen/crdt';

// --- Peer A: announce what I have ---
const myVector: Uint8Array = encodeStateVector(a.replica.version);
socket.send(myVector); // send over WebSocket, HTTP, BroadcastChannel, …

// --- Peer B: answer with the delta A is missing ---
const remoteVector = decodeStateVector(received);
const patch: Uint8Array = encodeOps(b.replica.delta(remoteVector));
socket.send(patch);

// --- Peer A: apply the patch ---
const ops = decodeOps<CharOp>(receivedPatch);
a.replica.receive(ops);

A minimal two-way protocol

Put the pieces together and a full reconciliation between two peers is four messages:

  1. Each peer sends its encodeStateVector(replica.version).
  2. On receiving the other's vector, each peer replies with encodeOps(replica.delta(theirVector)).
  3. Each peer receive()s the decoded delta.
  4. Both replicas now hold the same op set — and the same converged state.

This generalizes cleanly. For live collaboration, also forward each locally-committed op as it happens (subscribe with onUpdate, encode the op, broadcast it); peers that receive an op out of causal order simply buffer it. For catch-up after an offline gap, the state-vector handshake above replays exactly the missed ops. The same machinery covers both.

Dense clocks are a precondition

Version vectors assume each site's clocks are dense (1, 2, 3, …). That holds automatically when ids come from Replica.nextId(). If you mint ids yourself, never skip a value for a site — a gap would make delta believe a missing op was already delivered.

Where to next

  • Replica — the full API reference for commitLocal, receive, delta, and onUpdate.
  • OpLog and VersionVector — the storage and causality primitives underneath.
  • Playground — watch two replicas diverge and reconcile live in the browser.