Real-time collaboration mechanics — Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Add an in-process pub/sub hub plus two new HTTP endpoints (GET /api/stream, POST /api/stream/focus) so two collaborators see Topic activity, agent-job state, and presence on the same Document within ~1 s.

Architecture: New internal/realtime package holds a goroutine-safe hub keyed by source_path. The SSE handler subscribes a client, writes a one-field subscribed handshake frame, activates the subscription for fanout, then streams events. Existing mutation handlers and the agent runner call Hub.Publish after their SQLite write commits — events are advisory; REST endpoints remain authoritative. No SQL migration.

Tech Stack: Go 1.x stdlib (net/http, http.Flusher, sync.RWMutex, channels, context), existing internal/auth, internal/collab, internal/agent packages. modernc.org/sqlite driver (unchanged).

Spec: ../specs/2026-05-13-realtime-collab-design.html. Decisions log: ../specs/2026-05-10-collaborative-annotations-decisions.md.


File structure

New:

Modified:

Deliberately not changed in this plan (owned by #8):


Conventions every task assumes


Task 1: Realtime package skeleton — Event, Principal, ErrHubClosed

Files:

This task establishes the types the rest of the plan refers to. No behaviour yet — just enough surface for a compile-time check.

// internal/realtime/hub_test.go
package realtime_test

import (
	"errors"
	"testing"

	"github.com/getorcha/wiki-browser/internal/realtime"
)

func TestErrHubClosed_isSentinel(t *testing.T) {
	if !errors.Is(realtime.ErrHubClosed, realtime.ErrHubClosed) {
		t.Fatal("ErrHubClosed must satisfy errors.Is against itself")
	}
	if realtime.ErrHubClosed.Error() == "" {
		t.Fatal("ErrHubClosed must have a non-empty message")
	}
}

func TestEvent_zeroValueCompiles(t *testing.T) {
	var e realtime.Event
	_ = e.Type
	_ = e.Payload
}

func TestPrincipal_zeroValueCompiles(t *testing.T) {
	var p realtime.Principal
	_ = p.UserID
	_ = p.DisplayName
	_ = p.SessionID
}
go test ./internal/realtime/ -run TestErrHubClosed -v

Expected: FAIL with no Go files in internal/realtime (the package doesn't exist yet).

// internal/realtime/hub.go
package realtime

import "errors"

// Event is a single SSE frame published to subscribers. Payload must marshal
// to JSON; the SSE handler encodes it inline.
type Event struct {
	Type    string
	Payload any
}

// Principal carries the identity of the SSE-stream consumer. Lives here (not
// imported from internal/auth) so the realtime package stays independent of
// auth's package shape. The SSE handler maps from auth.Principal +
// auth.SessionInfo into this struct on subscribe.
type Principal struct {
	UserID      string
	DisplayName string
	SessionID   string
}

// ErrHubClosed is returned by Hub.Subscribe after Hub.Close has been called.
var ErrHubClosed = errors.New("realtime: hub closed")
go test ./internal/realtime/ -v

Expected: PASS (three tests).

git add internal/realtime/
git commit -m "wiki-browser: realtime — package skeleton (Event, Principal, ErrHubClosed)"

Task 2: Hub with Subscribe / Activate / Publish and one-way fanout

The Subscribe/Activate split is load-bearing: it guarantees the SSE handler can write the subscribed handshake frame before any other event reaches the channel.

Files:

Append to internal/realtime/hub_test.go:

import (
	// ... keep existing imports ...
	"sync"
	"time"
)

func newPrincipal(userID, session string) realtime.Principal {
	return realtime.Principal{UserID: userID, DisplayName: userID, SessionID: session}
}

func drain(t *testing.T, ch <-chan realtime.Event, want int, timeout time.Duration) []realtime.Event {
	t.Helper()
	got := make([]realtime.Event, 0, want)
	deadline := time.After(timeout)
	for len(got) < want {
		select {
		case ev, ok := <-ch:
			if !ok {
				return got
			}
			got = append(got, ev)
		case <-deadline:
			t.Fatalf("timed out waiting for %d events, got %d: %+v", want, len(got), got)
		}
	}
	return got
}

func TestHub_PublishReachesActivatedSubscriber(t *testing.T) {
	hub := realtime.NewHub()
	defer hub.Close()
	sub, err := hub.Subscribe("a.md", newPrincipal("u1", "s1"))
	if err != nil {
		t.Fatal(err)
	}
	hub.Activate(sub)
	hub.Publish("a.md", realtime.Event{Type: "topic.created", Payload: map[string]string{"topic_id": "t1"}})
	evts := drain(t, sub.Events(), 1, 100*time.Millisecond)
	if evts[0].Type != "topic.created" {
		t.Fatalf("got %q, want topic.created", evts[0].Type)
	}
}

func TestHub_PublishBeforeActivateIsDropped(t *testing.T) {
	hub := realtime.NewHub()
	defer hub.Close()
	sub, _ := hub.Subscribe("a.md", newPrincipal("u1", "s1"))
	hub.Publish("a.md", realtime.Event{Type: "topic.created"})
	// No Activate — the event must not be in the channel.
	select {
	case ev := <-sub.Events():
		t.Fatalf("unexpected event before Activate: %+v", ev)
	case <-time.After(20 * time.Millisecond):
		// good
	}
}

func TestHub_NoCrossSourceLeak(t *testing.T) {
	hub := realtime.NewHub()
	defer hub.Close()
	a, _ := hub.Subscribe("a.md", newPrincipal("u1", "s1"))
	b, _ := hub.Subscribe("b.md", newPrincipal("u2", "s2"))
	hub.Activate(a)
	hub.Activate(b)
	hub.Publish("a.md", realtime.Event{Type: "topic.created"})
	got := drain(t, a.Events(), 1, 100*time.Millisecond)
	if got[0].Type != "topic.created" {
		t.Fatalf("a got %q", got[0].Type)
	}
	select {
	case ev := <-b.Events():
		t.Fatalf("b received cross-source event: %+v", ev)
	case <-time.After(20 * time.Millisecond):
	}
}

func TestHub_FanoutToMultipleSubscribersOnSameSource(t *testing.T) {
	hub := realtime.NewHub()
	defer hub.Close()
	a, _ := hub.Subscribe("a.md", newPrincipal("u1", "s1"))
	b, _ := hub.Subscribe("a.md", newPrincipal("u2", "s2"))
	hub.Activate(a)
	hub.Activate(b)
	hub.Publish("a.md", realtime.Event{Type: "topic.created"})
	var wg sync.WaitGroup
	for _, s := range []*realtime.Subscriber{a, b} {
		wg.Add(1)
		go func(s *realtime.Subscriber) {
			defer wg.Done()
			drain(t, s.Events(), 1, 100*time.Millisecond)
		}(s)
	}
	wg.Wait()
}

func TestHub_SubscribeAfterCloseFails(t *testing.T) {
	hub := realtime.NewHub()
	hub.Close()
	if _, err := hub.Subscribe("a.md", newPrincipal("u1", "s1")); err == nil || err.Error() != realtime.ErrHubClosed.Error() {
		t.Fatalf("Subscribe after Close: err=%v, want ErrHubClosed", err)
	}
}
go test ./internal/realtime/ -v

Expected: build failure (undefined: realtime.NewHub, realtime.Subscriber, etc.).

Replace internal/realtime/hub.go with:

// internal/realtime/hub.go
package realtime

import (
	"errors"
	"sync"

	"github.com/google/uuid"
)

// Event is a single SSE frame published to subscribers.
type Event struct {
	Type    string
	Payload any
}

// Principal carries the identity of the SSE-stream consumer.
type Principal struct {
	UserID      string
	DisplayName string
	SessionID   string
}

// ErrHubClosed is returned by Subscribe after Close has been called.
var ErrHubClosed = errors.New("realtime: hub closed")

// subscriberBufferSize is the per-subscriber event channel capacity. See the
// "Backpressure" section of the design spec for the rationale (64 absorbs the
// 3-event burst of an Agent incorporation plus prior running transitions and
// presence churn, with ample headroom; 16 KiB per subscriber stays trivial on
// the Pi).
const subscriberBufferSize = 64

// Hub is the in-process pub/sub fanout. Goroutine-safe. Keyed by source_path.
type Hub struct {
	mu      sync.RWMutex
	closed  bool
	sources map[string]map[*Subscriber]struct{}
}

// Subscriber represents one live SSE stream. Owned by the calling handler;
// must be closed when the handler returns.
type Subscriber struct {
	id     string
	source string
	princ  Principal

	mu             sync.Mutex
	focusedTopicID string
	events         chan Event
	closed         bool
	activated      bool

	hub *Hub
}

func NewHub() *Hub {
	return &Hub{sources: map[string]map[*Subscriber]struct{}{}}
}

// Subscribe allocates a Subscriber for sourcePath but does NOT register it for
// fanout. The caller writes a `subscribed` handshake frame onto the SSE
// stream, then calls Activate. Splitting the two operations guarantees
// `subscribed` is the first frame on the wire — concurrent Publish calls on
// the same source cannot deliver into this Subscriber's channel until
// Activate runs.
func (h *Hub) Subscribe(sourcePath string, p Principal) (*Subscriber, error) {
	h.mu.RLock()
	closed := h.closed
	h.mu.RUnlock()
	if closed {
		return nil, ErrHubClosed
	}
	return &Subscriber{
		id:     uuid.NewString(),
		source: sourcePath,
		princ:  p,
		events: make(chan Event, subscriberBufferSize),
		hub:    h,
	}, nil
}

// Activate joins the subscriber to the source's fanout set.
func (h *Hub) Activate(s *Subscriber) {
	h.mu.Lock()
	defer h.mu.Unlock()
	if h.closed {
		return
	}
	s.mu.Lock()
	s.activated = true
	s.mu.Unlock()
	set := h.sources[s.source]
	if set == nil {
		set = map[*Subscriber]struct{}{}
		h.sources[s.source] = set
	}
	set[s] = struct{}{}
}

// Publish fans an event to every activated subscriber on sourcePath. Non-
// blocking: if a subscriber's buffer is full, the hub asynchronously closes
// that subscriber and the SSE handler will return + the client will reconnect
// + refetch via REST.
func (h *Hub) Publish(sourcePath string, evt Event) {
	h.mu.RLock()
	set := h.sources[sourcePath]
	subs := make([]*Subscriber, 0, len(set))
	for s := range set {
		subs = append(subs, s)
	}
	h.mu.RUnlock()
	for _, s := range subs {
		s.deliver(evt)
	}
}

// Close shuts down the hub. After Close, Subscribe returns ErrHubClosed.
// Existing subscribers' channels are closed.
func (h *Hub) Close() {
	h.mu.Lock()
	if h.closed {
		h.mu.Unlock()
		return
	}
	h.closed = true
	subs := []*Subscriber{}
	for _, set := range h.sources {
		for s := range set {
			subs = append(subs, s)
		}
	}
	h.sources = map[string]map[*Subscriber]struct{}{}
	h.mu.Unlock()
	for _, s := range subs {
		s.closeChan()
	}
}

// ID returns the server-generated subscriber_id sent in the `subscribed` event.
func (s *Subscriber) ID() string { return s.id }

// Source returns the source_path this subscription is bound to.
func (s *Subscriber) Source() string { return s.source }

// Principal returns the bound Principal (read-only).
func (s *Subscriber) Principal() Principal { return s.princ }

// Events is the read-only stream the SSE handler ranges over.
func (s *Subscriber) Events() <-chan Event { return s.events }

// Close removes the subscriber from the hub's fanout set and closes its
// channel. Idempotent. Safe to call from the SSE handler's defer.
func (s *Subscriber) Close() {
	s.hub.removeSubscriber(s)
}

func (s *Subscriber) deliver(evt Event) {
	s.mu.Lock()
	if s.closed {
		s.mu.Unlock()
		return
	}
	select {
	case s.events <- evt:
		s.mu.Unlock()
	default:
		// Buffer full → slow consumer. Close and let the SSE handler exit;
		// the client reconnects + refetches via REST.
		s.mu.Unlock()
		go s.hub.removeSubscriber(s)
	}
}

func (s *Subscriber) closeChan() {
	s.mu.Lock()
	if s.closed {
		s.mu.Unlock()
		return
	}
	s.closed = true
	close(s.events)
	s.mu.Unlock()
}

func (h *Hub) removeSubscriber(s *Subscriber) {
	h.mu.Lock()
	if set, ok := h.sources[s.source]; ok {
		delete(set, s)
		if len(set) == 0 {
			delete(h.sources, s.source)
		}
	}
	h.mu.Unlock()
	s.closeChan()
}
go test ./internal/realtime/ -v

Expected: all tests pass.

git add internal/realtime/
git commit -m "wiki-browser: realtime — hub with Subscribe/Activate fanout"

Task 3: Backpressure — slow consumer dropped at buffer overflow

Files:

The hub already implements drop-on-full in Task 2's deliver. This task tests the contract explicitly.

Append to internal/realtime/hub_test.go:

func TestHub_SlowConsumerDropped(t *testing.T) {
	hub := realtime.NewHub()
	defer hub.Close()
	sub, _ := hub.Subscribe("a.md", newPrincipal("u1", "s1"))
	hub.Activate(sub)

	// Fill the buffer (64) without reading.
	for i := 0; i < 64; i++ {
		hub.Publish("a.md", realtime.Event{Type: "x"})
	}
	// One more should trigger backpressure → close.
	hub.Publish("a.md", realtime.Event{Type: "overflow"})

	// Drain whatever fit, then verify the channel closes.
	deadline := time.After(200 * time.Millisecond)
	gotClose := false
	for !gotClose {
		select {
		case _, ok := <-sub.Events():
			if !ok {
				gotClose = true
			}
		case <-deadline:
			t.Fatal("subscriber channel never closed after overflow")
		}
	}
}

func TestHub_PublishNeverBlocks(t *testing.T) {
	hub := realtime.NewHub()
	defer hub.Close()
	sub, _ := hub.Subscribe("a.md", newPrincipal("u1", "s1"))
	hub.Activate(sub)
	done := make(chan struct{})
	go func() {
		for i := 0; i < 1000; i++ {
			hub.Publish("a.md", realtime.Event{Type: "x"})
		}
		close(done)
	}()
	select {
	case <-done:
	case <-time.After(500 * time.Millisecond):
		t.Fatal("Publish blocked on a slow subscriber")
	}
}
go test ./internal/realtime/ -run 'TestHub_(SlowConsumerDropped|PublishNeverBlocks)' -v

Expected: both pass (impl from Task 2 already satisfies them). If either fails, fix the implementation before continuing — backpressure is correctness, not nice-to-have.

git add internal/realtime/hub_test.go
git commit -m "wiki-browser: realtime — backpressure tests"

Task 4: SetFocus and presence recompute on subscribe/activate/close

Files:

Presence is emitted as presence.updated events with payload {subscriptions: [...]} — one entry per active subscription on the source. The hub recomputes and republishes on every Activate, SetFocus, and removeSubscriber.

Append to hub_test.go:

type presencePayload struct {
	Subscriptions []presenceEntry `json:"subscriptions"`
}

type presenceEntry struct {
	SubscriberID   string `json:"subscriber_id"`
	UserID         string `json:"user_id"`
	DisplayName    string `json:"display_name"`
	FocusedTopicID string `json:"focused_topic_id,omitempty"`
}

func presenceFrom(t *testing.T, ev realtime.Event) presencePayload {
	t.Helper()
	if ev.Type != "presence.updated" {
		t.Fatalf("not presence.updated: %s", ev.Type)
	}
	p, ok := ev.Payload.(realtime.PresencePayload)
	if !ok {
		t.Fatalf("payload type = %T, want realtime.PresencePayload", ev.Payload)
	}
	out := presencePayload{}
	for _, s := range p.Subscriptions {
		out.Subscriptions = append(out.Subscriptions, presenceEntry{
			SubscriberID: s.SubscriberID, UserID: s.UserID,
			DisplayName: s.DisplayName, FocusedTopicID: s.FocusedTopicID,
		})
	}
	return out
}

func TestHub_PresenceEmittedOnActivate(t *testing.T) {
	hub := realtime.NewHub()
	defer hub.Close()
	a, _ := hub.Subscribe("a.md", newPrincipal("daniel@x", "s1"))
	hub.Activate(a)
	evts := drain(t, a.Events(), 1, 100*time.Millisecond)
	p := presenceFrom(t, evts[0])
	if len(p.Subscriptions) != 1 || p.Subscriptions[0].UserID != "daniel@x" {
		t.Fatalf("snapshot = %+v", p)
	}
}

func TestHub_PresenceRecomputeOnSecondActivate(t *testing.T) {
	hub := realtime.NewHub()
	defer hub.Close()
	a, _ := hub.Subscribe("a.md", newPrincipal("daniel@x", "s1"))
	hub.Activate(a)
	drain(t, a.Events(), 1, 100*time.Millisecond) // initial snapshot
	b, _ := hub.Subscribe("a.md", newPrincipal("max@y", "s2"))
	hub.Activate(b)
	// Both subscribers receive an updated snapshot.
	got := drain(t, a.Events(), 1, 100*time.Millisecond)
	pA := presenceFrom(t, got[0])
	if len(pA.Subscriptions) != 2 {
		t.Fatalf("after second Activate, a saw %d subs", len(pA.Subscriptions))
	}
	got = drain(t, b.Events(), 1, 100*time.Millisecond)
	pB := presenceFrom(t, got[0])
	if len(pB.Subscriptions) != 2 {
		t.Fatalf("after second Activate, b saw %d subs", len(pB.Subscriptions))
	}
}

func TestHub_SetFocusRepublishesPresence(t *testing.T) {
	hub := realtime.NewHub()
	defer hub.Close()
	a, _ := hub.Subscribe("a.md", newPrincipal("daniel@x", "s1"))
	b, _ := hub.Subscribe("a.md", newPrincipal("max@y", "s2"))
	hub.Activate(a)
	hub.Activate(b)
	// Drain the two initial snapshots from both subscribers.
	drain(t, a.Events(), 2, 100*time.Millisecond)
	drain(t, b.Events(), 1, 100*time.Millisecond)

	a.SetFocus("topic-123")
	got := drain(t, b.Events(), 1, 100*time.Millisecond)
	p := presenceFrom(t, got[0])
	var aEntry presenceEntry
	for _, e := range p.Subscriptions {
		if e.UserID == "daniel@x" {
			aEntry = e
		}
	}
	if aEntry.FocusedTopicID != "topic-123" {
		t.Fatalf("after SetFocus, daniel's focus = %q, want topic-123", aEntry.FocusedTopicID)
	}
}

func TestHub_RemoveSubscriberRepublishesPresence(t *testing.T) {
	hub := realtime.NewHub()
	defer hub.Close()
	a, _ := hub.Subscribe("a.md", newPrincipal("daniel@x", "s1"))
	b, _ := hub.Subscribe("a.md", newPrincipal("max@y", "s2"))
	hub.Activate(a)
	hub.Activate(b)
	drain(t, a.Events(), 2, 100*time.Millisecond)
	drain(t, b.Events(), 1, 100*time.Millisecond)
	a.Close()
	got := drain(t, b.Events(), 1, 100*time.Millisecond)
	p := presenceFrom(t, got[0])
	if len(p.Subscriptions) != 1 || p.Subscriptions[0].UserID != "max@y" {
		t.Fatalf("after a.Close, b saw %+v", p.Subscriptions)
	}
}
go test ./internal/realtime/ -v

Expected: build failure (undefined: realtime.PresencePayload, (*Subscriber).SetFocus).

Add to internal/realtime/hub.go:

// PresencePayload is the JSON-marshalable payload of presence.updated events.
// Each entry corresponds to one live subscription — two tabs from the same
// user produce two entries with the same user_id.
type PresencePayload struct {
	Subscriptions []PresenceEntry `json:"subscriptions"`
}

type PresenceEntry struct {
	SubscriberID   string `json:"subscriber_id"`
	UserID         string `json:"user_id"`
	DisplayName    string `json:"display_name"`
	FocusedTopicID string `json:"focused_topic_id,omitempty"`
}

Modify (*Hub).Activate to call presence recompute at the end:

func (h *Hub) Activate(s *Subscriber) {
	h.mu.Lock()
	if h.closed {
		h.mu.Unlock()
		return
	}
	s.mu.Lock()
	s.activated = true
	s.mu.Unlock()
	set := h.sources[s.source]
	if set == nil {
		set = map[*Subscriber]struct{}{}
		h.sources[s.source] = set
	}
	set[s] = struct{}{}
	h.mu.Unlock()
	h.publishPresence(s.source)
}

Modify (*Hub).removeSubscriber to publish after removal:

func (h *Hub) removeSubscriber(s *Subscriber) {
	h.mu.Lock()
	source := s.source
	if set, ok := h.sources[source]; ok {
		delete(set, s)
		if len(set) == 0 {
			delete(h.sources, source)
		}
	}
	h.mu.Unlock()
	s.closeChan()
	h.publishPresence(source)
}

Add SetFocus to Subscriber and publishPresence to Hub:

// SetFocus updates the subscription's focused Topic and triggers a presence
// republish on this source.
func (s *Subscriber) SetFocus(topicID string) {
	s.mu.Lock()
	s.focusedTopicID = topicID
	activated := s.activated
	s.mu.Unlock()
	if activated {
		s.hub.publishPresence(s.source)
	}
}

func (h *Hub) publishPresence(sourcePath string) {
	h.mu.RLock()
	set := h.sources[sourcePath]
	entries := make([]PresenceEntry, 0, len(set))
	subs := make([]*Subscriber, 0, len(set))
	for s := range set {
		s.mu.Lock()
		entries = append(entries, PresenceEntry{
			SubscriberID:   s.id,
			UserID:         s.princ.UserID,
			DisplayName:    s.princ.DisplayName,
			FocusedTopicID: s.focusedTopicID,
		})
		s.mu.Unlock()
		subs = append(subs, s)
	}
	h.mu.RUnlock()
	evt := Event{Type: "presence.updated", Payload: PresencePayload{Subscriptions: entries}}
	for _, s := range subs {
		s.deliver(evt)
	}
}

NOTE on ordering. The presence entries' order is unstable (map iteration). Tests must not rely on order — they filter by UserID. If a deterministic order is ever needed for the wire, sort by SubscriberID inside publishPresence — but v1 does not require it.

go test ./internal/realtime/ -v

Expected: all pass.

git add internal/realtime/
git commit -m "wiki-browser: realtime — presence model (per-subscription, recomputed)"

Task 5: Wire Realtime into Deps, cmd/wiki-browser/main.go, and agent.Service

This task gets the hub instantiated and threaded through the dependency graph so subsequent tasks can use it. Smoke test: the existing test suite still passes.

Files:

Append to internal/server/handler_doc_test.go:

import (
	// existing imports …
	"github.com/getorcha/wiki-browser/internal/realtime"
)

func TestNewTestServer_acceptsRealtimeHub(t *testing.T) {
	// Smoke check that passing a hub through Deps compiles and the existing
	// routes still respond. This is a regression guard for the wiring change.
	hub := realtime.NewHub()
	t.Cleanup(hub.Close)
	ts, _, _ := newTestServerWithOptions(t, testServerOptions{
		SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
		Realtime:          hub,
	})
	resp, err := http.Get(ts.URL + "/")
	if err != nil {
		t.Fatal(err)
	}
	defer resp.Body.Close()
	if resp.StatusCode != 200 {
		t.Fatalf("status = %d", resp.StatusCode)
	}
}
go test ./internal/server/ -run TestNewTestServer_acceptsRealtimeHub -v

Expected: build failure (testServerOptions.Realtime undefined, Deps.Realtime undefined).

Modify the Deps struct (around line 14):

type Deps struct {
	Title  string
	Root   string
	Walker *walker.Walker
	Index  *index.Index
	Cache  *render.Cache
	Collab *collab.Store

	Auth              *auth.Handlers
	SessionMiddleware func(http.Handler) http.Handler
	AuthDevMode       bool

	AgentService AgentService

	// Realtime is the in-process pub/sub hub for SSE. Nil disables
	// /api/stream + /api/stream/focus with 503.
	Realtime *realtime.Hub
}

Add the internal/realtime import to the imports block.

In internal/server/handler_doc_test.go, extend testServerOptions:

type testServerOptions struct {
	SessionMiddleware func(http.Handler) http.Handler
	AgentService      server.AgentService
	Realtime          *realtime.Hub
}

Pass it into server.Mux(...):

mux := server.Mux(server.Deps{
	// … existing fields …
	Realtime: opts.Realtime,
})

In internal/agent/service.go, extend ServiceConfig:

type ServiceConfig struct {
	// … existing fields …

	// Realtime is the hub the runner publishes job + proposal events on.
	// Nil disables event publishing (existing tests don't need it; production
	// always passes one).
	Realtime *realtime.Hub
}

Add internal/realtime import. Add a helper:

func (s *Service) publish(sourcePath string, evt realtime.Event) {
	if s.cfg.Realtime == nil {
		return
	}
	s.cfg.Realtime.Publish(sourcePath, evt)
}

(Tasks 9–11 will call this from (*Service).run.)

After collabStore is opened (around line 116) and before the agent service is constructed:

hub := realtime.NewHub()
defer hub.Close()

Defer-order note. run() returns through srv.Shutdown(...) in the signal-handler branch, then unwinds its deferred functions in LIFO order before returning to main. The hub must shut down after Shutdown drains in-flight SSE handlers — otherwise the handlers' defer sub.Close() would race a hub that's already torn down. Placing defer hub.Close() between collabStore open and agentSvc construction makes it later (closer to the top) than defer agentSvc.Stop(); LIFO means agentSvc.Stop() fires first (drains in-flight agent jobs that may still call Hub.Publish), then hub.Close(). That's the right order. Don't introduce a defer srv.Shutdown(...) — the existing select-branch path already calls it.

Add the import. Pass Realtime: hub into agent.NewService(agent.ServiceConfig{ ... }) and server.Mux(server.Deps{ ... }).

go test ./internal/server/ ./internal/agent/ ./internal/realtime/ -v
go vet ./...

Expected: all pass, no vet warnings.

make build

Expected: builds dist/wiki-browser and dist/wb-agent without errors.

git add internal/server/server.go internal/server/handler_doc_test.go internal/agent/service.go cmd/wiki-browser/main.go
git commit -m "wiki-browser: wire realtime.Hub into Deps, agent.Service, main"

Task 6: GET /api/stream — happy path

Implement the SSE handler's happy path: validate source path, validate file exists, subscribe, write subscribed frame, activate, fan out events. Defer keepalive and session-touch handling to the next task.

Files:

Create internal/server/handler_stream_test.go:

package server_test

import (
	"bufio"
	"encoding/json"
	"net/http"
	"strings"
	"testing"
	"time"

	"github.com/getorcha/wiki-browser/internal/realtime"
)

// sseEvent is one parsed SSE frame (event: + data: lines).
type sseEvent struct {
	Type string
	Data string
}

// openSSE opens the stream and returns a channel of parsed events plus a
// cancel func. The channel closes when the response body closes.
func openSSE(t *testing.T, url string) (<-chan sseEvent, func()) {
	t.Helper()
	req, err := http.NewRequest(http.MethodGet, url, nil)
	if err != nil {
		t.Fatal(err)
	}
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		t.Fatal(err)
	}
	if resp.StatusCode != http.StatusOK {
		body, _ := bufio.NewReader(resp.Body).ReadString('\n')
		resp.Body.Close()
		t.Fatalf("stream status = %d body=%q", resp.StatusCode, body)
	}
	out := make(chan sseEvent, 32)
	go func() {
		defer close(out)
		// Body close is owned by the returned cancel func — closing here
		// would double-Close.
		scanner := bufio.NewScanner(resp.Body)
		scanner.Buffer(make([]byte, 64<<10), 1<<20)
		var ev sseEvent
		for scanner.Scan() {
			line := scanner.Text()
			switch {
			case line == "":
				if ev.Type != "" {
					out <- ev
					ev = sseEvent{}
				}
			case strings.HasPrefix(line, "event: "):
				ev.Type = strings.TrimPrefix(line, "event: ")
			case strings.HasPrefix(line, "data: "):
				ev.Data = strings.TrimPrefix(line, "data: ")
			case strings.HasPrefix(line, ":"):
				// comment — keepalive, ignored
			}
		}
	}()
	return out, func() { _ = resp.Body.Close() }
}

func waitForEvent(t *testing.T, ch <-chan sseEvent, wantType string, timeout time.Duration) sseEvent {
	t.Helper()
	deadline := time.After(timeout)
	for {
		select {
		case ev, ok := <-ch:
			if !ok {
				t.Fatalf("stream closed waiting for %s", wantType)
			}
			if ev.Type == wantType {
				return ev
			}
		case <-deadline:
			t.Fatalf("timeout waiting for %s", wantType)
		}
	}
}

func TestStream_subscribedFrameFirstThenPresence(t *testing.T) {
	hub := realtime.NewHub()
	t.Cleanup(hub.Close)
	ts, _, _ := newTestServerWithOptions(t, testServerOptions{
		SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
		Realtime:          hub,
	})

	ch, cancel := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
	defer cancel()

	// First frame must be `subscribed`.
	ev := <-ch
	if ev.Type != "subscribed" {
		t.Fatalf("first event = %q, want subscribed", ev.Type)
	}
	var sub struct {
		SubscriberID string `json:"subscriber_id"`
	}
	if err := json.Unmarshal([]byte(ev.Data), &sub); err != nil {
		t.Fatalf("decode subscribed: %v", err)
	}
	if sub.SubscriberID == "" {
		t.Fatal("subscriber_id empty")
	}

	// Followed (in some order, but soon) by a presence.updated frame.
	waitForEvent(t, ch, "presence.updated", 200*time.Millisecond)
}

func TestStream_badSourcePathReturns400(t *testing.T) {
	hub := realtime.NewHub()
	t.Cleanup(hub.Close)
	ts, _, _ := newTestServerWithOptions(t, testServerOptions{
		SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
		Realtime:          hub,
	})
	resp, err := http.Get(ts.URL + "/api/stream?source_path=../escape.md")
	if err != nil {
		t.Fatal(err)
	}
	defer resp.Body.Close()
	if resp.StatusCode != http.StatusBadRequest {
		t.Fatalf("status = %d, want 400", resp.StatusCode)
	}
}

func TestStream_missingFileReturns404(t *testing.T) {
	hub := realtime.NewHub()
	t.Cleanup(hub.Close)
	ts, _, _ := newTestServerWithOptions(t, testServerOptions{
		SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
		Realtime:          hub,
	})
	resp, err := http.Get(ts.URL + "/api/stream?source_path=does-not-exist.md")
	if err != nil {
		t.Fatal(err)
	}
	defer resp.Body.Close()
	if resp.StatusCode != http.StatusNotFound {
		t.Fatalf("status = %d, want 404", resp.StatusCode)
	}
}

func TestStream_anonymousReturns401(t *testing.T) {
	hub := realtime.NewHub()
	t.Cleanup(hub.Close)
	ts, _, _ := newTestServerWithOptions(t, testServerOptions{
		SessionMiddleware: nil,
		Realtime:          hub,
	})
	resp, err := http.Get(ts.URL + "/api/stream?source_path=a.md")
	if err != nil {
		t.Fatal(err)
	}
	defer resp.Body.Close()
	if resp.StatusCode != http.StatusUnauthorized {
		t.Fatalf("status = %d, want 401", resp.StatusCode)
	}
}

func TestStream_realtimeNilReturns503(t *testing.T) {
	ts, _, _ := newTestServerWithOptions(t, testServerOptions{
		SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
		Realtime:          nil,
	})
	resp, err := http.Get(ts.URL + "/api/stream?source_path=a.md")
	if err != nil {
		t.Fatal(err)
	}
	defer resp.Body.Close()
	if resp.StatusCode != http.StatusServiceUnavailable {
		t.Fatalf("status = %d, want 503", resp.StatusCode)
	}
}
go test ./internal/server/ -run TestStream -v

Expected: 404 on /api/stream (route not registered).

Create internal/server/handler_stream.go:

package server

import (
	"encoding/json"
	"fmt"
	"net/http"
	"os"
	"path/filepath"

	"github.com/getorcha/wiki-browser/internal/auth"
	"github.com/getorcha/wiki-browser/internal/collab"
	"github.com/getorcha/wiki-browser/internal/realtime"
)

// handleStream implements GET /api/stream?source_path=…
//
// SSE protocol: one HTTP response, kept open. Frames are
//
//	event: <type>
//	data: <JSON payload>
//
// followed by a blank line. Keepalive uses `:keepalive` comment lines. The
// stream never emits `id:` lines — v1 has no replay log.
func (d Deps) handleStream(w http.ResponseWriter, r *http.Request) {
	if d.Realtime == nil || d.Collab == nil {
		writeJSONError(w, http.StatusServiceUnavailable, "collab_unavailable")
		return
	}
	sourcePath := r.URL.Query().Get("source_path")
	if _, err := collab.ValidateSourcePath(sourcePath); err != nil {
		writeJSONError(w, http.StatusBadRequest, "bad_source_path")
		return
	}
	abs := filepath.Join(d.Root, sourcePath)
	if info, err := os.Stat(abs); err != nil || info.IsDir() {
		writeJSONError(w, http.StatusNotFound, "unknown_source")
		return
	}

	flusher, ok := w.(http.Flusher)
	if !ok {
		writeJSONError(w, http.StatusInternalServerError, "no_flusher")
		return
	}

	principal, _ := auth.PrincipalFrom(r.Context())
	sessionInfo, _ := auth.SessionFrom(r.Context())

	sub, err := d.Realtime.Subscribe(sourcePath, realtime.Principal{
		UserID:      principal.UserID,
		DisplayName: principal.DisplayName,
		SessionID:   sessionInfo.IDHash,
	})
	if err != nil {
		writeJSONError(w, http.StatusServiceUnavailable, "hub_closed")
		return
	}
	defer sub.Close()

	w.Header().Set("Content-Type", "text/event-stream")
	w.Header().Set("Cache-Control", "no-store")
	w.Header().Set("Connection", "keep-alive")
	w.Header().Set("X-Accel-Buffering", "no")
	w.WriteHeader(http.StatusOK)

	// Handshake — `subscribed` must be the first frame on the wire. Write
	// before Activate so Publish-from-elsewhere cannot race in ahead of it.
	if err := writeSSEEvent(w, "subscribed", map[string]string{"subscriber_id": sub.ID()}); err != nil {
		return
	}
	flusher.Flush()

	d.Realtime.Activate(sub)

	ctx := r.Context()
	for {
		select {
		case <-ctx.Done():
			return
		case ev, ok := <-sub.Events():
			if !ok {
				return
			}
			if err := writeSSEEvent(w, ev.Type, ev.Payload); err != nil {
				return
			}
			flusher.Flush()
		}
	}
}

// writeSSEEvent serialises one SSE frame. `event:` + `data:` only, never `id:`
// — the spec forbids `id:` because v1 has no replay log.
func writeSSEEvent(w http.ResponseWriter, evType string, payload any) error {
	if _, err := fmt.Fprintf(w, "event: %s\n", evType); err != nil {
		return err
	}
	b, err := json.Marshal(payload)
	if err != nil {
		return fmt.Errorf("marshal event %s: %w", evType, err)
	}
	if _, err := fmt.Fprintf(w, "data: %s\n\n", string(b)); err != nil {
		return err
	}
	return nil
}

In Mux(...), after the /api/agent/jobs/{id} registration:

mux.Handle("GET /api/stream",
	d.withSession(auth.RequireCollaborator(http.HandlerFunc(d.handleStream))))
go test ./internal/server/ -run TestStream -v

Expected: all pass.

git add internal/server/handler_stream.go internal/server/handler_stream_test.go internal/server/server.go
git commit -m "wiki-browser: server — GET /api/stream SSE handler (happy path)"

Task 7: Keepalive ticker with session touch

The handler must (a) emit a :keepalive comment every 15 s, (b) close the stream when the session is revoked / expired, (c) call TouchSession to extend the sliding expiry so SSE-only observers don't time out.

To make this testable without a 15 s wait, the keepalive interval comes from Deps (config-driven default, test-time override).

Files:

Append to internal/server/handler_stream_test.go:

func TestStream_keepaliveCommentEmitted(t *testing.T) {
	hub := realtime.NewHub()
	t.Cleanup(hub.Close)
	ts, _, _ := newTestServerWithOptions(t, testServerOptions{
		SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
		Realtime:          hub,
		StreamKeepalive:   30 * time.Millisecond,
	})
	req, _ := http.NewRequest(http.MethodGet, ts.URL+"/api/stream?source_path=a.md", nil)
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		t.Fatal(err)
	}
	defer resp.Body.Close()
	scanner := bufio.NewScanner(resp.Body)
	scanner.Buffer(make([]byte, 64<<10), 1<<20)
	deadline := time.Now().Add(500 * time.Millisecond)
	sawKeepalive := false
	for scanner.Scan() && time.Now().Before(deadline) {
		if strings.HasPrefix(scanner.Text(), ":keepalive") {
			sawKeepalive = true
			break
		}
	}
	if !sawKeepalive {
		t.Fatal("no :keepalive line emitted within 500ms")
	}
}

func TestStream_doesNotEmitIDLines(t *testing.T) {
	// Reading first frame from the stream and asserting no `id:` line slipped
	// in — protects against accidental Last-Event-ID handling.
	hub := realtime.NewHub()
	t.Cleanup(hub.Close)
	ts, _, _ := newTestServerWithOptions(t, testServerOptions{
		SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
		Realtime:          hub,
		StreamKeepalive:   1 * time.Second,
	})
	req, _ := http.NewRequest(http.MethodGet, ts.URL+"/api/stream?source_path=a.md", nil)
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		t.Fatal(err)
	}
	defer resp.Body.Close()
	scanner := bufio.NewScanner(resp.Body)
	scanner.Buffer(make([]byte, 64<<10), 1<<20)
	deadline := time.Now().Add(200 * time.Millisecond)
	for scanner.Scan() && time.Now().Before(deadline) {
		line := scanner.Text()
		if strings.HasPrefix(line, "id:") {
			t.Fatalf("unexpected id: line: %q", line)
		}
	}
}

In internal/server/handler_doc_test.go:

type testServerOptions struct {
	SessionMiddleware func(http.Handler) http.Handler
	AgentService      server.AgentService
	Realtime          *realtime.Hub
	StreamKeepalive   time.Duration
}

Pass it into Deps:

mux := server.Mux(server.Deps{
	// … existing fields …
	StreamKeepalive: opts.StreamKeepalive,
})

Add "time" to imports.

In internal/server/server.go, add to Deps:

// StreamKeepalive controls the SSE keepalive cadence. Zero means use the
// default (15s). Tests override to make assertions fast.
StreamKeepalive time.Duration

Add "time" to imports.

go test ./internal/server/ -run 'TestStream_keepalive|TestStream_doesNotEmitID' -v

Expected: tests fail (handler doesn't emit keepalive).

Replace the loop in internal/server/handler_stream.go:

import (
	// existing imports …
	"time"
)

const defaultKeepaliveInterval = 15 * time.Second

func (d Deps) handleStream(w http.ResponseWriter, r *http.Request) {
	// … existing validation, subscribe, headers, subscribed frame, Activate …

	keepaliveInterval := d.StreamKeepalive
	if keepaliveInterval <= 0 {
		keepaliveInterval = defaultKeepaliveInterval
	}
	ticker := time.NewTicker(keepaliveInterval)
	defer ticker.Stop()

	ctx := r.Context()
	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			if _, err := fmt.Fprint(w, ":keepalive\n\n"); err != nil {
				return
			}
			flusher.Flush()
		case ev, ok := <-sub.Events():
			if !ok {
				return
			}
			if err := writeSSEEvent(w, ev.Type, ev.Payload); err != nil {
				return
			}
			flusher.Flush()
		}
	}
}
go test ./internal/server/ -run 'TestStream' -v

Expected: keepalive + no-id tests pass; pre-existing TestStream tests still pass.

git add internal/server/server.go internal/server/handler_stream.go internal/server/handler_stream_test.go internal/server/handler_doc_test.go
git commit -m "wiki-browser: server — SSE keepalive ticker"

Task 8: Keepalive ticks touch last_seen_at; revoked/expired sessions close the stream

This adds the H5 fix from the code review: a long-lived SSE connection participates in the sliding-session expiry that auth.SessionMiddleware already implements per-request.

Files:

The handler needs two things from the surrounding wiring:

  1. auth.SessionStoreLookupSession, TouchSession. Already on collab.Store.
  2. The session lifetime (so TouchSession can compute the new expires_at).

No clock injection in v1: tests use a short StreamKeepalive (e.g. 30 ms) and read real wall-clock time inside the handler. If a future test needs deterministic timing for the touch-throttle logic, plumb a NowFunc func() time.Time through Deps then — it's a one-field addition.

Append to internal/server/handler_stream_test.go:

import (
	// existing imports …
	"github.com/getorcha/wiki-browser/internal/auth"
	"github.com/getorcha/wiki-browser/internal/collab"
)

// recordingSessionStore wraps a collab.Store and counts TouchSession calls.
// Per-instance state — never package-level — so each test is isolated.
type recordingSessionStore struct {
	inner *collab.Store

	mu      sync.Mutex
	touches int
}

func (r *recordingSessionStore) LookupSession(idHash string, now time.Time) (collab.Session, bool, error) {
	return r.inner.LookupSession(idHash, now)
}
func (r *recordingSessionStore) TouchSession(idHash string, seenAt, expiresAt time.Time) error {
	r.mu.Lock()
	r.touches++
	r.mu.Unlock()
	return r.inner.TouchSession(idHash, seenAt, expiresAt)
}
func (r *recordingSessionStore) RevokeSession(idHash string, revokedAt time.Time) error {
	return r.inner.RevokeSession(idHash, revokedAt)
}
func (r *recordingSessionStore) RotateSessionCSRF(idHash, csrfHash string) error {
	return r.inner.RotateSessionCSRF(idHash, csrfHash)
}
func (r *recordingSessionStore) Touches() int {
	r.mu.Lock()
	defer r.mu.Unlock()
	return r.touches
}

func TestStream_keepaliveTouchesSession(t *testing.T) {
	hub := realtime.NewHub()
	t.Cleanup(hub.Close)

	// newTestServerWithOptions constructs the recordingSessionStore inside so
	// we get it back as the third return value; this avoids any package-level
	// state that would leak across tests in the same process.
	ts, _, store, rec := newTestServerWithOptionsAndRecordingStore(t, testServerOptions{
		Realtime:            hub,
		StreamKeepalive:     30 * time.Millisecond,
		StreamTouchInterval: 0, // touch every tick
		SessionLifetime:     time.Hour,
	}, "daniel@getorcha.com")
	_ = store

	req, _ := http.NewRequest(http.MethodGet, ts.URL+"/api/stream?source_path=a.md", nil)
	req.AddCookie(&http.Cookie{Name: auth.SessionCookieName, Value: "test-session-token"})
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		t.Fatal(err)
	}
	defer resp.Body.Close()

	// Poll the touch counter rather than sleeping a fixed interval.
	deadline := time.Now().Add(500 * time.Millisecond)
	for time.Now().Before(deadline) {
		if rec.Touches() >= 1 {
			return
		}
		time.Sleep(20 * time.Millisecond)
	}
	t.Fatalf("TouchSession not called within 500ms (touches=%d)", rec.Touches())
}

func TestStream_revokedSessionClosesStream(t *testing.T) {
	hub := realtime.NewHub()
	t.Cleanup(hub.Close)
	ts, _, store, _ := newTestServerWithOptionsAndRecordingStore(t, testServerOptions{
		Realtime:            hub,
		StreamKeepalive:     30 * time.Millisecond,
		SessionLifetime:     time.Hour,
		StreamTouchInterval: time.Second, // don't touch every tick
	}, "daniel@getorcha.com")

	req, _ := http.NewRequest(http.MethodGet, ts.URL+"/api/stream?source_path=a.md", nil)
	req.AddCookie(&http.Cookie{Name: auth.SessionCookieName, Value: "test-session-token"})
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		t.Fatal(err)
	}
	defer resp.Body.Close()

	// Revoke the session in the DB.
	if err := store.RevokeSession(auth.TokenHash("test-session-token"), time.Now()); err != nil {
		t.Fatal(err)
	}

	// Read the response body in a goroutine that closes a channel on EOF.
	// bufio.Scanner blocks on Scan(); a wall-clock loop around it does
	// nothing once Scan is already blocked. The goroutine + select is the
	// only deadline-aware pattern here.
	closed := make(chan struct{})
	go func() {
		defer close(closed)
		scanner := bufio.NewScanner(resp.Body)
		scanner.Buffer(make([]byte, 64<<10), 1<<20)
		for scanner.Scan() {
		}
	}()
	select {
	case <-closed:
		// good — stream returned EOF
	case <-time.After(500 * time.Millisecond):
		t.Fatal("stream did not close within 500ms after session revocation")
	}
}
import (
	// existing imports plus:
	"os"
	"path/filepath"
	"sync"

	"github.com/getorcha/wiki-browser/internal/auth"
	"github.com/getorcha/wiki-browser/internal/index"
	"github.com/getorcha/wiki-browser/internal/render"
	"github.com/getorcha/wiki-browser/internal/server"
	"github.com/getorcha/wiki-browser/internal/walker"
)

// realSessionMiddlewareForStreamTest builds a SessionMiddleware backed by the
// real store; useful for SSE tests that need to exercise revocation paths.
// Also seeds a session row keyed by "test-session-token" so the cookie path
// resolves.
func realSessionMiddlewareForStreamTest(t *testing.T, store *collab.Store, userID string) func(http.Handler) http.Handler {
	t.Helper()
	if err := store.CreateSession(collab.Session{
		IDHash:     auth.TokenHash("test-session-token"),
		UserID:     userID,
		CSRFHash:   auth.TokenHash("csrf"),
		CreatedAt:  time.Now(),
		LastSeenAt: time.Now().Add(-time.Hour), // older than SessionTouchInterval → real middleware touches
		ExpiresAt:  time.Now().Add(time.Hour),
	}); err != nil {
		t.Fatal(err)
	}
	return auth.SessionMiddlewareWithCookieSecure(store, time.Now, time.Hour, false)
}

// newTestServerWithOptionsAndRecordingStore is the touch-test variant.
// Instead of reusing newTestServerWithOptions (which opens its own store),
// we build the fixture from scratch so the recordingSessionStore can wrap the
// same store the Deps see.
func newTestServerWithOptionsAndRecordingStore(t *testing.T, opts testServerOptions, userID string) (*httptest.Server, string, *collab.Store, *recordingSessionStore) {
	t.Helper()
	root := t.TempDir()
	if err := os.WriteFile(filepath.Join(root, "a.md"), []byte("# A\n"), 0o644); err != nil {
		t.Fatal(err)
	}
	w, err := walker.New(walker.Options{Root: root, Extensions: []string{".md"}})
	if err != nil {
		t.Fatal(err)
	}
	idx, err := index.Open(filepath.Join(t.TempDir(), "i.db"))
	if err != nil {
		t.Fatal(err)
	}
	idx.SetRoot(root)
	cache := render.NewCache(4 << 20)
	idx.SetCache(cache)
	t.Cleanup(func() { idx.Close() })

	store, err := collab.Open(collab.Config{Path: filepath.Join(t.TempDir(), "collab.db")})
	if err != nil {
		t.Fatal(err)
	}
	t.Cleanup(func() { _ = store.Close() })
	if err := store.UpsertUser(collab.User{ID: userID, DisplayName: "Daniel"}); err != nil {
		t.Fatal(err)
	}

	rec := &recordingSessionStore{inner: store}
	opts.SessionMiddleware = realSessionMiddlewareForStreamTest(t, store, userID)
	opts.SessionStore = rec

	mux := server.Mux(server.Deps{
		Title: "Test", Root: root, Walker: w, Index: idx, Cache: cache, Collab: store,
		SessionMiddleware:   opts.SessionMiddleware,
		AgentService:        opts.AgentService,
		Realtime:            opts.Realtime,
		StreamKeepalive:     opts.StreamKeepalive,
		SessionStore:        opts.SessionStore,
		SessionLifetime:     opts.SessionLifetime,
		StreamTouchInterval: opts.StreamTouchInterval,
	})
	ts := httptest.NewServer(mux)
	t.Cleanup(ts.Close)
	return ts, root, store, rec
}

Have recordingSessionStore.TouchSession call incrementRecordedTouches() in addition to the inner store call.

In internal/server/handler_doc_test.go:

type testServerOptions struct {
	SessionMiddleware   func(http.Handler) http.Handler
	AgentService        server.AgentService
	Realtime            *realtime.Hub
	StreamKeepalive     time.Duration
	SessionStore        auth.SessionStore
	SessionLifetime     time.Duration
	StreamTouchInterval time.Duration
}

In server.go:

// SessionStore lets the stream handler revalidate the session on each
// keepalive tick. Required when StreamSessionMiddleware is non-nil.
SessionStore auth.SessionStore

// SessionLifetime is the rolling TTL used when the stream handler touches the
// session. Defaults to 30 days when zero (matching SessionMiddleware).
SessionLifetime time.Duration

// StreamTouchInterval throttles TouchSession calls on keepalive ticks.
// Default: auth.SessionTouchInterval (10 min). Tests override to verify the
// touch path.
StreamTouchInterval time.Duration

Pass SessionStore: store from cmd/wiki-browser/main.go (use the existing collabStore — it already satisfies auth.SessionStore).

go test ./internal/server/ -run 'TestStream_keepaliveTouchesSession|TestStream_revokedSessionClosesStream' -v

Expected: tests fail (handler ignores session state).

In internal/server/handler_stream.go, refactor the loop so each keepalive tick:

  1. LookupSession(sessionInfo.IDHash, time.Now()) — if missing/expired, return.
  2. If time.Since(session.LastSeenAt) >= StreamTouchInterval, call TouchSession.
  3. Write :keepalive and flush.
func (d Deps) handleStream(w http.ResponseWriter, r *http.Request) {
	// … existing validation, Subscribe, headers, subscribed frame, Activate …

	keepaliveInterval := d.StreamKeepalive
	if keepaliveInterval <= 0 {
		keepaliveInterval = defaultKeepaliveInterval
	}
	touchInterval := d.StreamTouchInterval
	if touchInterval < 0 {
		touchInterval = 0
	}
	if d.StreamTouchInterval == 0 && d.SessionStore != nil {
		// Zero from a real config = use the package default.
		touchInterval = auth.SessionTouchInterval
	}
	sessionLifetime := d.SessionLifetime
	if sessionLifetime <= 0 {
		sessionLifetime = 30 * 24 * time.Hour
	}

	ticker := time.NewTicker(keepaliveInterval)
	defer ticker.Stop()

	ctx := r.Context()
	for {
		select {
		case <-ctx.Done():
			return
		case now := <-ticker.C:
			if d.SessionStore != nil && sessionInfo.IDHash != "" {
				session, ok, err := d.SessionStore.LookupSession(sessionInfo.IDHash, now)
				if err != nil || !ok {
					return // session revoked or expired
				}
				if now.Sub(session.LastSeenAt) >= touchInterval {
					if err := d.SessionStore.TouchSession(sessionInfo.IDHash, now, now.Add(sessionLifetime)); err != nil {
						// non-fatal; same policy as SessionMiddleware
					}
				}
			}
			if _, err := fmt.Fprint(w, ":keepalive\n\n"); err != nil {
				return
			}
			flusher.Flush()
		case ev, ok := <-sub.Events():
			if !ok {
				return
			}
			if err := writeSSEEvent(w, ev.Type, ev.Payload); err != nil {
				return
			}
			flusher.Flush()
		}
	}
}

Add "github.com/getorcha/wiki-browser/internal/auth" to the imports (for auth.SessionTouchInterval).

go test ./internal/server/ -run TestStream -v

Expected: all pass.

In cmd/wiki-browser/main.go, in the server.Mux(server.Deps{ ... }) block, add:

SessionStore:    collabStore,
SessionLifetime: sessionLifetime,
git add internal/server/handler_stream.go internal/server/handler_stream_test.go internal/server/handler_doc_test.go internal/server/server.go cmd/wiki-browser/main.go
git commit -m "wiki-browser: server — SSE keepalive touches session, closes on revocation"

Task 9: POST /api/stream/focus — subscriber_id lookup, rate limiter, terminal-Topic coercion

Files:

To support the focus endpoint, the hub needs a way to look up a subscriber by id, scoped to a session (so a hijacked id from another principal can't be used).

Append to internal/server/handler_stream_test.go:

func TestStreamFocus_setsFocusAndPublishesPresence(t *testing.T) {
	hub := realtime.NewHub()
	t.Cleanup(hub.Close)
	ts, _, _ := newTestServerWithOptions(t, testServerOptions{
		SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
		Realtime:          hub,
		StreamKeepalive:   1 * time.Second,
	})

	// Open the stream and read the subscribed handshake.
	ch, cancel := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
	defer cancel()
	ev := <-ch
	var sub struct {
		SubscriberID string `json:"subscriber_id"`
	}
	if err := json.Unmarshal([]byte(ev.Data), &sub); err != nil {
		t.Fatal(err)
	}
	waitForEvent(t, ch, "presence.updated", 200*time.Millisecond)

	// Create a Topic to focus on.
	resp := postWithCSRF(t, ts.URL+"/api/topics", `{"source_path":"a.md","global":true,"first_message_body":"hello"}`)
	defer resp.Body.Close()
	var created struct{ ID string `json:"id"` }
	if err := json.NewDecoder(resp.Body).Decode(&created); err != nil || created.ID == "" {
		t.Fatalf("decode topic create: err=%v", err)
	}

	body := `{"subscriber_id":"` + sub.SubscriberID + `","topic_id":"` + created.ID + `"}`
	focusResp := postWithCSRF(t, ts.URL+"/api/stream/focus", body)
	defer focusResp.Body.Close()
	if focusResp.StatusCode != http.StatusNoContent {
		t.Fatalf("focus status = %d body=%s", focusResp.StatusCode, readAll(t, focusResp))
	}

	// The next presence.updated frame should show our focused_topic_id.
	pev := waitForEvent(t, ch, "presence.updated", 200*time.Millisecond)
	if !strings.Contains(pev.Data, `"focused_topic_id":"`+created.ID+`"`) {
		t.Fatalf("presence missing focus: %s", pev.Data)
	}
}

func TestStreamFocus_unknownSubscriberReturns404(t *testing.T) {
	hub := realtime.NewHub()
	t.Cleanup(hub.Close)
	ts, _, _ := newTestServerWithOptions(t, testServerOptions{
		SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
		Realtime:          hub,
	})
	resp := postWithCSRF(t, ts.URL+"/api/stream/focus", `{"subscriber_id":"bogus","topic_id":""}`)
	defer resp.Body.Close()
	if resp.StatusCode != http.StatusNotFound {
		t.Fatalf("status = %d, want 404", resp.StatusCode)
	}
}

func TestStreamFocus_terminalTopicCoercesToEmpty(t *testing.T) {
	hub := realtime.NewHub()
	t.Cleanup(hub.Close)
	ts, _, _ := newTestServerWithOptions(t, testServerOptions{
		SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
		Realtime:          hub,
		StreamKeepalive:   1 * time.Second,
	})

	// Open + handshake.
	ch, cancel := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
	defer cancel()
	ev := <-ch
	var sub struct{ SubscriberID string `json:"subscriber_id"` }
	if err := json.Unmarshal([]byte(ev.Data), &sub); err != nil || sub.SubscriberID == "" {
		t.Fatalf("decode subscribed: err=%v data=%q", err, ev.Data)
	}
	waitForEvent(t, ch, "presence.updated", 200*time.Millisecond)

	// Create + discard the Topic.
	resp := postWithCSRF(t, ts.URL+"/api/topics", `{"source_path":"a.md","global":true,"first_message_body":"hello"}`)
	var created struct{ ID string `json:"id"` }
	if err := json.NewDecoder(resp.Body).Decode(&created); err != nil || created.ID == "" {
		t.Fatalf("decode topic create: err=%v", err)
	}
	resp.Body.Close()
	resp = postWithCSRF(t, ts.URL+"/api/topics/"+created.ID+"/discard", `{}`)
	resp.Body.Close()

	// Focus on the discarded Topic must succeed silently with 204; the
	// resulting presence frame must NOT include the discarded topic_id.
	body := `{"subscriber_id":"` + sub.SubscriberID + `","topic_id":"` + created.ID + `"}`
	focusResp := postWithCSRF(t, ts.URL+"/api/stream/focus", body)
	defer focusResp.Body.Close()
	if focusResp.StatusCode != http.StatusNoContent {
		t.Fatalf("focus status = %d body=%s", focusResp.StatusCode, readAll(t, focusResp))
	}
}

func TestStreamFocus_rateLimiterRejectsBurst(t *testing.T) {
	hub := realtime.NewHub()
	t.Cleanup(hub.Close)
	ts, _, _ := newTestServerWithOptions(t, testServerOptions{
		SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
		Realtime:          hub,
		StreamKeepalive:   1 * time.Second,
	})

	ch, cancel := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
	defer cancel()
	ev := <-ch
	var sub struct{ SubscriberID string `json:"subscriber_id"` }
	if err := json.Unmarshal([]byte(ev.Data), &sub); err != nil || sub.SubscriberID == "" {
		t.Fatalf("decode subscribed: err=%v data=%q", err, ev.Data)
	}
	waitForEvent(t, ch, "presence.updated", 200*time.Millisecond)

	body := `{"subscriber_id":"` + sub.SubscriberID + `","topic_id":""}`
	var lastStatus int
	for i := 0; i < 5; i++ {
		resp := postWithCSRF(t, ts.URL+"/api/stream/focus", body)
		lastStatus = resp.StatusCode
		resp.Body.Close()
	}
	if lastStatus != http.StatusTooManyRequests {
		t.Fatalf("after burst, last status = %d, want 429", lastStatus)
	}
}

In internal/realtime/hub.go:

// LookupSubscriber returns the subscription matching id, scoped to the calling
// session. Returns nil if not found OR if the subscription's session does not
// match — the latter defends against a hijacked id from another principal.
func (h *Hub) LookupSubscriber(id, sessionID string) *Subscriber {
	h.mu.RLock()
	defer h.mu.RUnlock()
	for _, set := range h.sources {
		for s := range set {
			if s.id == id && s.princ.SessionID == sessionID {
				return s
			}
		}
	}
	return nil
}

Why this isn't keyed by source_path: the SSE handler binds (subscriber_id, source_path, session) at Subscribe time, so the source can be inferred from the subscriber rather than supplied by the client. Reduces request-shape surface.

Add to internal/server/handler_stream.go:

import (
	// existing imports …
	"database/sql"
	"errors"
	"sync"
)

type streamFocusRequest struct {
	SubscriberID string `json:"subscriber_id"`
	TopicID      string `json:"topic_id,omitempty"`
}

const focusLimitInterval = time.Second

// focusLimiter is the 1-call/sec token bucket keyed by (session, subscriber).
// Instance state — lives inside Deps (constructed in Mux) so each httptest.
// Server in the test suite gets its own bucket map. Package-level state would
// cross-contaminate the test binary, which runs all internal/server tests in
// one process.
type focusLimiter struct {
	mu   sync.Mutex
	last map[string]time.Time
}

func newFocusLimiter() *focusLimiter {
	return &focusLimiter{last: map[string]time.Time{}}
}

func (f *focusLimiter) allow(key string, now time.Time) bool {
	f.mu.Lock()
	defer f.mu.Unlock()
	prev, ok := f.last[key]
	if ok && now.Sub(prev) < focusLimitInterval {
		return false
	}
	f.last[key] = now
	return true
}

func (d Deps) handleStreamFocus(w http.ResponseWriter, r *http.Request) {
	if d.Realtime == nil || d.Collab == nil {
		writeJSONError(w, http.StatusServiceUnavailable, "collab_unavailable")
		return
	}
	var req streamFocusRequest
	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
		writeJSONError(w, http.StatusBadRequest, "bad_json")
		return
	}
	if req.SubscriberID == "" {
		writeJSONError(w, http.StatusBadRequest, "bad_request")
		return
	}

	sessionInfo, _ := auth.SessionFrom(r.Context())
	sub := d.Realtime.LookupSubscriber(req.SubscriberID, sessionInfo.IDHash)
	if sub == nil {
		writeJSONError(w, http.StatusNotFound, "unknown_subscriber")
		return
	}

	limitKey := sessionInfo.IDHash + ":" + sub.ID()
	if d.FocusLimiter == nil {
		writeJSONError(w, http.StatusInternalServerError, "limiter_misconfigured")
		return
	}
	if !d.FocusLimiter.allow(limitKey, time.Now()) {
		writeJSONError(w, http.StatusTooManyRequests, "too_many_focus_calls")
		return
	}

	if req.TopicID != "" {
		state, err := d.Collab.GetTopicState(req.TopicID)
		switch {
		case errors.Is(err, sql.ErrNoRows):
			writeJSONError(w, http.StatusNotFound, "unknown_topic")
			return
		case err != nil:
			writeJSONError(w, http.StatusInternalServerError, "topic_lookup_failed")
			return
		case state.SourcePath != sub.Source():
			writeJSONError(w, http.StatusNotFound, "unknown_topic")
			return
		case !state.Open:
			// Terminal Topic — coerce to empty rather than 409. The matching
			// topic.discarded / topic.incorporated event is in flight; the
			// chrome will repaint anyway.
			req.TopicID = ""
		}
	}
	sub.SetFocus(req.TopicID)
	w.WriteHeader(http.StatusNoContent)
}

Add a field to Deps:

// FocusLimiter throttles POST /api/stream/focus per (session, subscriber).
// Mux populates a default; tests can override.
FocusLimiter *focusLimiter

Inside func Mux(d Deps) *http.ServeMux, before the routes:

if d.FocusLimiter == nil {
	d.FocusLimiter = newFocusLimiter()
}

Then the route itself:

mux.Handle("POST /api/stream/focus",
	d.withSession(auth.RequireCollaborator(auth.RequireCSRF(http.HandlerFunc(d.handleStreamFocus)))))
go test ./internal/server/ -run 'TestStreamFocus' -v

Expected: all pass.

git add internal/realtime/hub.go internal/server/handler_stream.go internal/server/server.go internal/server/handler_stream_test.go
git commit -m "wiki-browser: server — POST /api/stream/focus"

Task 10: handleCreateTopic publishes topic.created

Files:

Append to internal/server/topics_test.go:

import (
	// existing imports …
	"time"

	"github.com/getorcha/wiki-browser/internal/realtime"
)

func TestCreateTopic_publishesTopicCreated(t *testing.T) {
	hub := realtime.NewHub()
	t.Cleanup(hub.Close)
	ts, _, _ := newTestServerWithOptions(t, testServerOptions{
		SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
		Realtime:          hub,
		StreamKeepalive:   1 * time.Second,
	})

	ch, cancel := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
	defer cancel()
	<-ch // subscribed
	waitForEvent(t, ch, "presence.updated", 200*time.Millisecond)

	resp := postWithCSRF(t, ts.URL+"/api/topics",
		`{"source_path":"a.md","global":true,"first_message_body":"hello world"}`)
	defer resp.Body.Close()
	if resp.StatusCode != http.StatusCreated {
		t.Fatalf("create status = %d", resp.StatusCode)
	}
	ev := waitForEvent(t, ch, "topic.created", 200*time.Millisecond)
	if !strings.Contains(ev.Data, `"first_message_preview":"hello world"`) {
		t.Fatalf("event missing preview: %s", ev.Data)
	}
	if !strings.Contains(ev.Data, `"source_path":"a.md"`) {
		t.Fatalf("event missing source_path: %s", ev.Data)
	}
	if !strings.Contains(ev.Data, `"anchor_kind":"global"`) {
		t.Fatalf("event missing anchor_kind: %s", ev.Data)
	}
	if !strings.Contains(ev.Data, `"created_by":"daniel@getorcha.com"`) {
		t.Fatalf("event missing created_by: %s", ev.Data)
	}
}
go test ./internal/server/ -run TestCreateTopic_publishesTopicCreated -v

Expected: timeout waiting for topic.created (handler doesn't publish).

In internal/server/topics.go, after the successful InsertTopicWithFirstMessage:

import (
	// existing imports …
	"strings"
	"time"

	"github.com/getorcha/wiki-browser/internal/realtime"
)

func topicCreatedPayload(topicID, sourcePath, anchorKind, firstMessageBody, createdBy string, now time.Time) map[string]any {
	return map[string]any{
		"topic_id":              topicID,
		"source_path":           sourcePath,
		"anchor_kind":           anchorKind,
		"first_message_preview": preview(firstMessageBody, 160),
		"created_by":            createdBy,
		"created_at":            now.Unix(),
	}
}

// preview truncates s to at most max bytes, appending an ellipsis if cut.
// Uses rune-aware truncation so multibyte content doesn't split mid-codepoint.
func preview(s string, max int) string {
	if len(s) <= max {
		return s
	}
	runes := []rune(s)
	if len(runes) <= max {
		return s
	}
	if max < 1 {
		return ""
	}
	return string(runes[:max-1]) + "…"
}

Inside handleCreateTopic, just before the final writeJSON(w, http.StatusCreated, …):

if d.Realtime != nil {
	anchorKind := "anchored"
	if req.Global {
		anchorKind = "global"
	}
	d.Realtime.Publish(req.SourcePath, realtime.Event{
		Type:    "topic.created",
		Payload: topicCreatedPayload(topicID, req.SourcePath, anchorKind, req.FirstMessageBody, principal.UserID, time.Now()),
	})
}
go test ./internal/server/ -run TestCreateTopic -v

Expected: all pass.

git add internal/server/topics.go internal/server/topics_test.go
git commit -m "wiki-browser: server — handleCreateTopic publishes topic.created"

Task 11: handleAppendTopicMessage publishes topic.message_appended

Files:

Payload carries proposal_id? (always null for human-authored messages; non-null only for the agent-proposal case which Task 14 covers). Sequence comes back from InsertMessage indirectly — we need to read it via a follow-up SELECT. Since InsertMessage doesn't return the sequence today, add a helper that reads the row by (topic_id, id) after the insert.

Actually, since the sequence is consulted only for the SSE event payload (REST consumers fetch via /api/topics/{id}/messages), and the SSE consumer is going to refetch anyway, we have a choice: omit sequence from the payload, or extend InsertMessage's signature.

Simpler: extend InsertMessage to return (id, sequence, error). The spec event surface requires it.

Append to internal/server/topics_test.go:

func TestAppendMessage_publishesTopicMessageAppended(t *testing.T) {
	hub := realtime.NewHub()
	t.Cleanup(hub.Close)
	ts, _, _ := newTestServerWithOptions(t, testServerOptions{
		SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
		Realtime:          hub,
		StreamKeepalive:   1 * time.Second,
	})

	// Create a topic to attach the message to.
	resp := postWithCSRF(t, ts.URL+"/api/topics",
		`{"source_path":"a.md","global":true,"first_message_body":"first"}`)
	defer resp.Body.Close()
	var created struct{ ID string `json:"id"` }
	if err := json.NewDecoder(resp.Body).Decode(&created); err != nil || created.ID == "" {
		t.Fatalf("decode topic create: err=%v", err)
	}

	ch, cancel := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
	defer cancel()
	<-ch // subscribed
	waitForEvent(t, ch, "presence.updated", 200*time.Millisecond)

	resp = postWithCSRF(t, ts.URL+"/api/topics/"+created.ID+"/messages", `{"body":"second"}`)
	defer resp.Body.Close()
	if resp.StatusCode != http.StatusCreated {
		t.Fatalf("append status = %d", resp.StatusCode)
	}

	ev := waitForEvent(t, ch, "topic.message_appended", 200*time.Millisecond)
	if !strings.Contains(ev.Data, `"topic_id":"`+created.ID+`"`) {
		t.Fatalf("missing topic_id: %s", ev.Data)
	}
	if !strings.Contains(ev.Data, `"kind":"human"`) {
		t.Fatalf("missing kind: %s", ev.Data)
	}
	if !strings.Contains(ev.Data, `"body_preview":"second"`) {
		t.Fatalf("missing body_preview: %s", ev.Data)
	}
	if !strings.Contains(ev.Data, `"author_user_id":"daniel@getorcha.com"`) {
		t.Fatalf("missing author_user_id: %s", ev.Data)
	}
	if !strings.Contains(ev.Data, `"sequence":2`) {
		t.Fatalf("missing/wrong sequence: %s", ev.Data)
	}
	if strings.Contains(ev.Data, `"proposal_id"`) {
		t.Fatalf("proposal_id should be omitted for human messages: %s", ev.Data)
	}
}

In internal/collab/mutators.go, change InsertMessage's signature:

// InsertMessage allocates the next per-topic sequence and inserts the row.
// Returns (id, sequence, error). Callers that need the sequence for an event
// payload read it from the return value; older callers can ignore it.
func (s *Store) InsertMessage(m NewMessage) (string, int, error) {
	if err := validateMessage(m); err != nil {
		return "", 0, err
	}
	var next int
	err := s.send(func(db *sql.DB) error {
		err := db.QueryRow(
			`SELECT COALESCE(MAX(sequence), 0) + 1
			 FROM topic_messages WHERE topic_id = ?`,
			m.TopicID,
		).Scan(&next)
		if err != nil {
			return fmt.Errorf("allocate sequence: %w", err)
		}
		_, err = db.Exec(
			`INSERT INTO topic_messages(
			   id, topic_id, kind, body, author_user_id, proposal_id,
			   sequence, created_at
			 ) VALUES (?, ?, ?, ?, ?, ?, ?, unixepoch())`,
			m.ID, m.TopicID, m.Kind, m.Body, m.AuthorUserID, m.ProposalID, next,
		)
		return err
	})
	if err != nil {
		return "", 0, err
	}
	return m.ID, next, nil
}

Update every caller of InsertMessage. Use:

grep -rn '\.InsertMessage(' internal cmd

The callers are limited (chiefly in internal/server/topics.go and cmd/wb-agent); fix them all to the three-value signature, discarding _ where the sequence isn't needed.

In internal/server/topics.go, replace the IsTopicOpen switch with a single GetTopicState call (caches source_path for the publish), then replace the InsertMessage call:

state, err := d.Collab.GetTopicState(topicID)
switch {
case errors.Is(err, sql.ErrNoRows):
	writeJSONError(w, http.StatusNotFound, "unknown_topic")
	return
case err != nil:
	writeJSONError(w, http.StatusInternalServerError, "lookup_failed")
	return
case !state.Open:
	writeJSONError(w, http.StatusGone, "topic_closed")
	return
}

var req messageRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
	writeJSONError(w, http.StatusBadRequest, "bad_json")
	return
}
if err := validateBody(req.Body); err != nil {
	writeJSONError(w, http.StatusBadRequest, "body_required")
	return
}
id := uuid.NewString()
principal, _ := auth.PrincipalFrom(r.Context())
author := principal.UserID
_, seq, err := d.Collab.InsertMessage(collab.NewMessage{
	ID: id, TopicID: topicID, Kind: "human", Body: req.Body, AuthorUserID: &author,
})
if err != nil {
	writeJSONError(w, http.StatusInternalServerError, "insert_message_failed")
	return
}
if d.Realtime != nil {
	d.Realtime.Publish(state.SourcePath, realtime.Event{
		Type: "topic.message_appended",
		Payload: map[string]any{
			"topic_id":       topicID,
			"message_id":     id,
			"sequence":       seq,
			"kind":           "human",
			"body_preview":   preview(req.Body, 160),
			"author_user_id": author,
			"created_at":     time.Now().Unix(),
		},
	})
}
writeJSON(w, http.StatusCreated, map[string]string{"id": id})
go test ./internal/server/ -run TestAppendMessage -v
go test ./internal/server/ -run TestTopicsAPI -v
go test ./internal/collab/ ./internal/agent/ -v

Expected: all pass. The signature change to InsertMessage is the most likely break point — fix any remaining callers.

InsertMessage's signature change touches every caller in the tree. cmd/wb-agent/main.go writes via raw SQL today, so only the test file there changes; cmd/wb-agent/ is added wholesale because the same signature change ripples through cmd/wb-agent/main_test.go. Production binary code in cmd/wb-agent is unchanged.

git add \
  internal/collab/mutators.go \
  internal/collab/mutators_test.go \
  internal/collab/reader_test.go \
  internal/server/topics.go \
  internal/server/topics_test.go \
  internal/server/e2e_resolve_test.go \
  internal/agent/service_test.go \
  internal/agent/e2e_test.go \
  cmd/wb-agent/main_test.go
git commit -m "wiki-browser: server — handleAppendTopicMessage publishes topic.message_appended"

(Run grep -rn '\.InsertMessage(' internal cmd after the signature change to confirm every caller has been updated to the three-value return.)


Task 12: handleDiscardTopic publishes topic.discarded

Files:

Append to internal/server/handler_stream_test.go:

func TestDiscardTopic_publishesTopicDiscarded(t *testing.T) {
	hub := realtime.NewHub()
	t.Cleanup(hub.Close)
	ts, _, _ := newTestServerWithOptions(t, testServerOptions{
		SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
		Realtime:          hub,
		StreamKeepalive:   1 * time.Second,
	})

	// Create a topic.
	resp := postWithCSRF(t, ts.URL+"/api/topics",
		`{"source_path":"a.md","global":true,"first_message_body":"hello"}`)
	var created struct{ ID string `json:"id"` }
	if err := json.NewDecoder(resp.Body).Decode(&created); err != nil || created.ID == "" {
		t.Fatalf("decode topic create: err=%v", err)
	}
	resp.Body.Close()

	ch, cancel := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
	defer cancel()
	<-ch
	waitForEvent(t, ch, "presence.updated", 200*time.Millisecond)

	resp = postWithCSRF(t, ts.URL+"/api/topics/"+created.ID+"/discard", `{"reason":"never mind"}`)
	defer resp.Body.Close()
	if resp.StatusCode != http.StatusOK {
		t.Fatalf("discard status = %d", resp.StatusCode)
	}
	ev := waitForEvent(t, ch, "topic.discarded", 200*time.Millisecond)
	if !strings.Contains(ev.Data, `"topic_id":"`+created.ID+`"`) {
		t.Fatalf("missing topic_id: %s", ev.Data)
	}
	if !strings.Contains(ev.Data, `"discarded_by":"daniel@getorcha.com"`) {
		t.Fatalf("missing discarded_by: %s", ev.Data)
	}
}
go test ./internal/server/ -run TestDiscardTopic_publishesTopicDiscarded -v

In internal/server/handler_proposals.go, after the successful DiscardTopic call:

import (
	// existing imports …
	"github.com/getorcha/wiki-browser/internal/realtime"
)

// inside handleDiscardTopic, after writeJSON(...) but before the return — or
// reshape so the response is written last:
if d.Realtime != nil {
	d.Realtime.Publish(state.SourcePath, realtime.Event{
		Type: "topic.discarded",
		Payload: map[string]any{
			"topic_id":     id,
			"discarded_by": principal.UserID,
			"discarded_at": discardedAt,
		},
	})
}
go test ./internal/server/ -v

Expected: pass.

git add internal/server/handler_proposals.go internal/server/handler_stream_test.go
git commit -m "wiki-browser: server — handleDiscardTopic publishes topic.discarded"

Task 13: handleIncorporate publishes topic.incorporated

Files:

The existing newFullStackFake fixture in e2e_resolve_test.go already stands up a real collab.Store + real agent.Service + FakeRunner + server.Mux. We extend it once to accept a hub, then add a new test that asserts the published event sequence end-to-end. Because the same hub also drives Task 14's agent-runner publishes, this test ends up covering both the approval-side topic.incorporated AND the runner-side topic.message_appended / proposal.created / job.updated once Task 14 is implemented.

In internal/server/e2e_resolve_test.go, modify the function signature and the two Deps it builds:

import (
	// existing imports …
	"github.com/getorcha/wiki-browser/internal/realtime"
)

func newFullStackFake(t *testing.T, proposalSource, explanation string) *fullStackEnv {
	return newFullStackFakeWithHub(t, proposalSource, explanation, nil)
}

func newFullStackFakeWithHub(t *testing.T, proposalSource, explanation string, hub *realtime.Hub) *fullStackEnv {
	// … existing body verbatim …

	service := agent.NewService(agent.ServiceConfig{
		// … existing fields …
		Realtime: hub, // new
	})
	t.Cleanup(service.Stop)

	// … existing walker/index/cache …

	mux := server.Mux(server.Deps{
		// … existing fields …
		Realtime: hub, // new
	})
	// … rest of the body unchanged …
}

The existing newFullStackFake callers continue to work (hub is nil for them, publishing is silently disabled).

Append to internal/server/e2e_resolve_test.go:

func TestE2EResolveFlow_publishesTopicIncorporated(t *testing.T) {
	hub := realtime.NewHub()
	t.Cleanup(hub.Close)
	env := newFullStackFakeWithHub(t, "alpha\nrewritten line\n", "explanation body", hub)

	ch, cancel := openSSE(t, env.ts.URL+"/api/stream?source_path=docs/x.md")
	defer cancel()
	<-ch // subscribed
	waitForEvent(t, ch, "presence.updated", 200*time.Millisecond)

	topicID := env.createTopic(t, "please rewrite")
	jobID := env.proposeRewrite(t, topicID)
	env.waitForJob(t, jobID)

	// Runner emits topic.message_appended (agent-proposal) + proposal.created
	// + job.updated{succeeded} once Task 14 is implemented. Skip ahead to the
	// approval event so this test passes after Task 13 alone, then strengthen
	// after Task 14 lands.
	proposals := env.listProposals(t, topicID)
	if len(proposals) != 1 {
		t.Fatalf("proposals = %d", len(proposals))
	}
	commitSHA := env.approve(t, proposals[0].ID)

	ev := waitForEvent(t, ch, "topic.incorporated", 500*time.Millisecond)
	if !strings.Contains(ev.Data, `"topic_id":"`+topicID+`"`) {
		t.Fatalf("missing topic_id: %s", ev.Data)
	}
	if !strings.Contains(ev.Data, `"source_path":"docs/x.md"`) {
		t.Fatalf("missing source_path: %s", ev.Data)
	}
	if !strings.Contains(ev.Data, `"commit_sha":"`+commitSHA+`"`) {
		t.Fatalf("missing commit_sha: %s", ev.Data)
	}
	if !strings.Contains(ev.Data, `"incorporated_by":"daniel@getorcha.com"`) {
		t.Fatalf("missing incorporated_by: %s", ev.Data)
	}
}

The proposalSummary struct returned by env.listProposals and the env.approve signature already exist in e2e_resolve_test.go — read the helper definitions there if the field names need confirming.

go test ./internal/server/ -run TestE2EResolveFlow_publishesTopicIncorporated -v

Expected: the test times out waiting for topic.incorporated because handleIncorporate doesn't publish yet.

In internal/server/handler_proposals.go, add the import:

import (
	// existing imports …
	"time"

	"github.com/getorcha/wiki-browser/internal/realtime"
)

After the successful collab.Incorporate(...) call (right before the final writeJSON(...)):

if d.Realtime != nil {
	d.Realtime.Publish(state.SourcePath, realtime.Event{
		Type: "topic.incorporated",
		Payload: map[string]any{
			"topic_id":        state.ID,
			"source_path":     state.SourcePath,
			"commit_sha":      commitSHA,
			"incorporated_by": principal.UserID,
			"incorporated_at": time.Now().Unix(),
		},
	})
}
go test ./internal/server/ -v

Expected: all pass, including the new test.

git add internal/server/handler_proposals.go internal/server/e2e_resolve_test.go
git commit -m "wiki-browser: server — handleIncorporate publishes topic.incorporated"

Task 14: Agent runner publishes the pinned-order event sequence

This is the central piece for #4 ↔ #6 integration. After CompleteJob succeeds:

Files:

Append to internal/agent/e2e_test.go. This test reuses the existing TestEndToEnd_IncorporateProducesProposal scaffolding pattern: a real collab.Store, a FakeRunner that inserts the proposal + agent-proposal message, and a stub hub that records every Publish call so we can assert the order.

import (
	// existing imports …
	"sync"

	"github.com/getorcha/wiki-browser/internal/realtime"
)

// recordingHub captures Publish calls for assertion. Satisfies
// agent.HubPublisher.
type recordingHub struct {
	mu     sync.Mutex
	events []recordedEvent
}
type recordedEvent struct {
	Source string
	Type   string
	Status string // job.updated payload's status (when applicable)
}

func (r *recordingHub) Publish(source string, evt realtime.Event) {
	r.mu.Lock()
	defer r.mu.Unlock()
	rec := recordedEvent{Source: source, Type: evt.Type}
	if evt.Type == "job.updated" {
		if m, ok := evt.Payload.(map[string]any); ok {
			if s, _ := m["status"].(string); s != "" {
				rec.Status = s
			}
		}
	}
	r.events = append(r.events, rec)
}
func (r *recordingHub) snapshot() []recordedEvent {
	r.mu.Lock()
	defer r.mu.Unlock()
	out := make([]recordedEvent, len(r.events))
	copy(out, r.events)
	return out
}

func TestEndToEnd_IncorporatePublishesPinnedEventOrderOnSuccess(t *testing.T) {
	store, err := collab.Open(collab.Config{Path: filepath.Join(t.TempDir(), "collab.db")})
	if err != nil {
		t.Fatal(err)
	}
	t.Cleanup(func() { _ = store.Close() })
	if _, err := store.RawDBForTest().Exec(
		`INSERT INTO users(id, display_name, created_at) VALUES ('u1','U1', unixepoch())`,
	); err != nil {
		t.Fatal(err)
	}
	if err := store.InsertTopicWithFirstMessage(collab.NewTopicWithFirstMessage{
		TopicID: "t1", SourcePath: "docs/foo.md",
		Anchor:    []byte(`{"kind":"global"}`),
		CreatedBy: "u1", FirstMessageID: "m1", FirstMessageBody: "rewrite foo",
	}); err != nil {
		t.Fatal(err)
	}

	runner := agent.NewFakeRunner(func(ctx context.Context, j agent.Job) agent.RunResult {
		const fakeSHA = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
		propID := "p1"
		if _, err := store.InsertProposal(collab.NewProposal{
			ID: propID, TopicID: "t1", RevisionNumber: 1,
			ProposedSource: "fake new source", BaseSourceSHA: fakeSHA,
			AgentJobID: &j.ID,
		}); err != nil {
			return agent.RunResult{ExitCode: 1, ErrorTail: err.Error()}
		}
		if _, _, err := store.InsertMessage(collab.NewMessage{
			ID: "msg-p1", TopicID: "t1", Kind: "agent-proposal",
			Body: "proposal explanation", ProposalID: &propID,
		}); err != nil {
			return agent.RunResult{ExitCode: 1, ErrorTail: err.Error()}
		}
		return agent.RunResult{ExitCode: 0}
	})

	hub := &recordingHub{}
	svc := agent.NewService(agent.ServiceConfig{
		Store: store, Runner: runner, MaxConcurrentJobs: 1,
		IncorporateTimeout: 2 * time.Second, PerspectiveTimeout: 2 * time.Second,
		RepoRoot: t.TempDir(), WikiRoot: t.TempDir(), WBAgentPath: "/wb-agent",
		Realtime: hub,
	})
	t.Cleanup(svc.Stop)

	jobID, err := svc.Submit(agent.SubmitInput{
		Kind: "incorporate", SourcePath: "docs/foo.md", TopicID: "t1",
	})
	if err != nil {
		t.Fatal(err)
	}

	// publishJobOutcome runs AFTER CompleteJob writes status=succeeded, so
	// polling for status alone races with the three publishes. Instead, wait
	// for the third event to arrive (or the deadline to expire). Status check
	// is done after, as a sanity guard.
	deadline := time.Now().Add(2 * time.Second)
	for time.Now().Before(deadline) {
		if len(hub.snapshot()) == 3 {
			break
		}
		time.Sleep(20 * time.Millisecond)
	}
	if j, err := store.GetJob(jobID); err != nil || j.Status != "succeeded" {
		t.Fatalf("job status = %v, err = %v", j.Status, err)
	}

	got := hub.snapshot()
	if len(got) != 3 {
		t.Fatalf("event count = %d, want 3 (got %+v)", len(got), got)
	}
	want := []string{"topic.message_appended", "proposal.created", "job.updated"}
	for i, w := range want {
		if got[i].Type != w {
			t.Fatalf("event[%d] = %q, want %q (all: %+v)", i, got[i].Type, w, got)
		}
		if got[i].Source != "docs/foo.md" {
			t.Fatalf("event[%d].Source = %q, want docs/foo.md", i, got[i].Source)
		}
	}
	if got[2].Status != "succeeded" {
		t.Fatalf("job.updated.status = %q, want succeeded", got[2].Status)
	}
}

func TestEndToEnd_IncorporatePublishesOnlyJobUpdatedOnFailure(t *testing.T) {
	store, err := collab.Open(collab.Config{Path: filepath.Join(t.TempDir(), "collab.db")})
	if err != nil {
		t.Fatal(err)
	}
	t.Cleanup(func() { _ = store.Close() })
	if _, err := store.RawDBForTest().Exec(
		`INSERT INTO users(id, display_name, created_at) VALUES ('u1','U1', unixepoch())`,
	); err != nil {
		t.Fatal(err)
	}
	if err := store.InsertTopicWithFirstMessage(collab.NewTopicWithFirstMessage{
		TopicID: "t1", SourcePath: "docs/foo.md",
		Anchor:    []byte(`{"kind":"global"}`),
		CreatedBy: "u1", FirstMessageID: "m1", FirstMessageBody: "rewrite foo",
	}); err != nil {
		t.Fatal(err)
	}

	// Runner that exits non-zero — classify() sets status=failed; the
	// post-invariant check never runs because the exit code already trips.
	runner := agent.NewFakeRunner(func(ctx context.Context, j agent.Job) agent.RunResult {
		return agent.RunResult{ExitCode: 2, ErrorTail: "boom"}
	})

	hub := &recordingHub{}
	svc := agent.NewService(agent.ServiceConfig{
		Store: store, Runner: runner, MaxConcurrentJobs: 1,
		IncorporateTimeout: 2 * time.Second, PerspectiveTimeout: 2 * time.Second,
		RepoRoot: t.TempDir(), WikiRoot: t.TempDir(), WBAgentPath: "/wb-agent",
		Realtime: hub,
	})
	t.Cleanup(svc.Stop)

	jobID, err := svc.Submit(agent.SubmitInput{
		Kind: "incorporate", SourcePath: "docs/foo.md", TopicID: "t1",
	})
	if err != nil {
		t.Fatal(err)
	}
	deadline := time.Now().Add(2 * time.Second)
	for time.Now().Before(deadline) {
		if len(hub.snapshot()) == 1 {
			break
		}
		time.Sleep(20 * time.Millisecond)
	}
	if j, err := store.GetJob(jobID); err != nil || j.Status != "failed" {
		t.Fatalf("job status = %v, err = %v", j.Status, err)
	}

	got := hub.snapshot()
	if len(got) != 1 || got[0].Type != "job.updated" || got[0].Status != "failed" {
		t.Fatalf("failure path published: %+v", got)
	}
}

To avoid service_test.go importing the concrete *realtime.Hub, declare a narrow interface in internal/agent/service.go:

// HubPublisher is the narrow surface the runner uses from realtime.Hub.
// *realtime.Hub satisfies this; tests may substitute a recording fake.
type HubPublisher interface {
	Publish(sourcePath string, evt realtime.Event)
}

Change ServiceConfig.Realtime to Realtime HubPublisher.

In cmd/wiki-browser/main.go, the existing assignment still works because *realtime.Hub satisfies the interface.

After the CompleteJob call:

// After s.cfg.Store.CompleteJob(...) returns successfully, publish events.
// Order matters for incorporate jobs (see decisions log: "Publish-after-
// commit, with pinned ordering for Agent incorporate jobs").
if s.cfg.Realtime != nil {
	s.publishJobOutcome(in, jobID, status)
}

The incorporate artifact events are all-or-nothing. If either post-commit read fails unexpectedly, skip topic.message_appended and proposal.created; still publish the authoritative job.updated fallback. This preserves the pinned ordering contract and avoids shipping a partial artifact sequence.

Add this method:

func (s *Service) publishJobOutcome(in SubmitInput, jobID, status string) {
	if in.Kind == "incorporate" && status == "succeeded" {
		// Both reads MUST succeed for a succeeded incorporate job — the
		// post-invariant check in classify() already ran them. A transient DB
		// error here would silently drop topic.message_appended +
		// proposal.created while job.updated{succeeded} still shipped, exactly
		// the flicker the pinned-order decision was designed to prevent.
		// Log loudly; the order-pin contract is load-bearing for the spec.
		prop, err := s.cfg.Store.GetProposalByJobID(jobID)
		if err != nil {
			slog.Warn("agent: publish-after-commit: GetProposalByJobID failed",
				"job_id", jobID, "err", err)
		} else if msg, err := s.cfg.Store.GetAgentProposalMessage(prop.ID); err != nil {
			slog.Warn("agent: publish-after-commit: GetAgentProposalMessage failed",
				"job_id", jobID, "proposal_id", prop.ID, "err", err)
		} else {
			s.cfg.Realtime.Publish(in.SourcePath, realtime.Event{
				Type: "topic.message_appended",
				Payload: map[string]any{
					"topic_id":     in.TopicID,
					"message_id":   msg.ID,
					"sequence":     msg.Sequence,
					"kind":         "agent-proposal",
					"body_preview": preview(msg.Body, 160),
					"proposal_id":  prop.ID,
					"created_at":   msg.CreatedAt, // already unix seconds
				},
			})
			s.cfg.Realtime.Publish(in.SourcePath, realtime.Event{
				Type: "proposal.created",
				Payload: map[string]any{
					"proposal_id":     prop.ID,
					"topic_id":        prop.TopicID,
					"source_path":     in.SourcePath,
					"revision_number": prop.RevisionNumber,
					"agent_job_id":    jobID,
				},
			})
		}
	}
	payload := map[string]any{
		"job_id": jobID,
		"kind":   in.Kind,
		"status": status,
	}
	if in.TopicID != "" {
		payload["topic_id"] = in.TopicID
	}
	if in.PersonaName != "" {
		payload["persona_name"] = in.PersonaName
	}
	s.cfg.Realtime.Publish(in.SourcePath, realtime.Event{
		Type:    "job.updated",
		Payload: payload,
	})
}

// preview lives in this package so the agent can share it with the server's
// payload builders without an import. Mirrors internal/server.preview.
func preview(s string, max int) string {
	if len(s) <= max {
		return s
	}
	runes := []rune(s)
	if len(runes) <= max {
		return s
	}
	if max < 1 {
		return ""
	}
	return string(runes[:max-1]) + "…"
}

NOTE. The errors from GetProposalByJobID and GetAgentProposalMessage are deliberately swallowed only for the artifact events: if classification passed but the rows can't be read, the post-invariant check should have caught it first and turned the status to failed. For a defensive fallback, log the failed read, skip the incomplete artifact sequence, and still publish job.updated so clients converge through REST refresh.

In the agent.NewService(...) call, add:

Realtime: hub,
go test ./internal/agent/ -v
go test ./internal/server/ -v

Expected: all pass. The agent's existing e2e_test.go should continue to pass; with the recordingHub wired in, the order assertions become testable in a follow-up.

git add internal/agent/service.go internal/agent/service_test.go cmd/wiki-browser/main.go
git commit -m "wiki-browser: agent — publish pinned event sequence on incorporate completion"

Task 15: End-to-end test — two simulated clients on the same Document

This is the integration test that proves the wiring works for the full event surface. Two SSE clients open against the same source; each mutation triggers the expected events on both.

Files:

package server_test

import (
	"encoding/json"
	"net/http"
	"strings"
	"testing"
	"time"

	"github.com/getorcha/wiki-browser/internal/realtime"
)

func TestE2E_twoClients_topicCreated_bothReceiveTheEvent(t *testing.T) {
	hub := realtime.NewHub()
	t.Cleanup(hub.Close)
	ts, _, _ := newTestServerWithOptions(t, testServerOptions{
		SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
		Realtime:          hub,
		StreamKeepalive:   1 * time.Second,
	})

	a, cancelA := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
	defer cancelA()
	b, cancelB := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
	defer cancelB()
	<-a
	<-b
	waitForEvent(t, a, "presence.updated", 200*time.Millisecond)
	waitForEvent(t, b, "presence.updated", 200*time.Millisecond)

	resp := postWithCSRF(t, ts.URL+"/api/topics",
		`{"source_path":"a.md","global":true,"first_message_body":"shared topic"}`)
	defer resp.Body.Close()
	if resp.StatusCode != http.StatusCreated {
		t.Fatalf("create status = %d", resp.StatusCode)
	}

	ea := waitForEvent(t, a, "topic.created", 200*time.Millisecond)
	eb := waitForEvent(t, b, "topic.created", 200*time.Millisecond)
	for _, ev := range []sseEvent{ea, eb} {
		if !strings.Contains(ev.Data, `"first_message_preview":"shared topic"`) {
			t.Fatalf("event missing preview: %s", ev.Data)
		}
	}
}

func TestE2E_twoClients_messageAppend_bothReceive(t *testing.T) {
	hub := realtime.NewHub()
	t.Cleanup(hub.Close)
	ts, _, _ := newTestServerWithOptions(t, testServerOptions{
		SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
		Realtime:          hub,
		StreamKeepalive:   1 * time.Second,
	})

	resp := postWithCSRF(t, ts.URL+"/api/topics",
		`{"source_path":"a.md","global":true,"first_message_body":"first"}`)
	var created struct{ ID string `json:"id"` }
	if err := json.NewDecoder(resp.Body).Decode(&created); err != nil || created.ID == "" {
		t.Fatalf("decode topic create: err=%v", err)
	}
	resp.Body.Close()

	a, cancelA := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
	defer cancelA()
	b, cancelB := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
	defer cancelB()
	<-a
	<-b
	waitForEvent(t, a, "presence.updated", 200*time.Millisecond)
	waitForEvent(t, b, "presence.updated", 200*time.Millisecond)

	resp = postWithCSRF(t, ts.URL+"/api/topics/"+created.ID+"/messages", `{"body":"second"}`)
	resp.Body.Close()

	waitForEvent(t, a, "topic.message_appended", 200*time.Millisecond)
	waitForEvent(t, b, "topic.message_appended", 200*time.Millisecond)
}

func TestE2E_presence_focusReflectedInOtherClient(t *testing.T) {
	hub := realtime.NewHub()
	t.Cleanup(hub.Close)
	ts, _, _ := newTestServerWithOptions(t, testServerOptions{
		SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
		Realtime:          hub,
		StreamKeepalive:   1 * time.Second,
	})

	// Create a topic to focus on.
	resp := postWithCSRF(t, ts.URL+"/api/topics",
		`{"source_path":"a.md","global":true,"first_message_body":"focus me"}`)
	var created struct{ ID string `json:"id"` }
	if err := json.NewDecoder(resp.Body).Decode(&created); err != nil || created.ID == "" {
		t.Fatalf("decode topic create: err=%v", err)
	}
	resp.Body.Close()

	a, cancelA := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
	defer cancelA()
	b, cancelB := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
	defer cancelB()

	evA := <-a
	var subA struct{ SubscriberID string `json:"subscriber_id"` }
	json.Unmarshal([]byte(evA.Data), &subA)
	<-b
	waitForEvent(t, a, "presence.updated", 200*time.Millisecond)
	waitForEvent(t, b, "presence.updated", 200*time.Millisecond)

	resp = postWithCSRF(t, ts.URL+"/api/stream/focus",
		`{"subscriber_id":"`+subA.SubscriberID+`","topic_id":"`+created.ID+`"}`)
	if resp.StatusCode != http.StatusNoContent {
		t.Fatalf("focus status = %d", resp.StatusCode)
	}
	resp.Body.Close()

	// B's stream should observe a presence frame including A's focus.
	ev := waitForEvent(t, b, "presence.updated", 200*time.Millisecond)
	if !strings.Contains(ev.Data, `"focused_topic_id":"`+created.ID+`"`) {
		t.Fatalf("B missing focus in presence: %s", ev.Data)
	}
}
go test ./internal/server/ -run 'TestE2E_' -v

Expected: all pass.

git add internal/server/e2e_realtime_test.go
git commit -m "wiki-browser: server — e2e multi-client realtime tests"

Task 16: Build + vet smoke

A final pre-merge check.

go test ./... -count=1

Expected: zero failures.

go vet ./...

Expected: clean.

make build

Expected: produces dist/wiki-browser and dist/wb-agent.

make build-arm64

Expected: produces dist/wiki-browser-arm64 and dist/wb-agent-arm64.

If the suite is green and the binaries build, no commit is needed. If you had to fix a stray import or test, commit with wiki-browser: clean up after #6 implementation.


What's NOT in this plan (and why)


Self-review notes (do not delete)

Coverage spot-check against the spec sections: