For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Add an in-process pub/sub hub plus two new HTTP endpoints (GET /api/stream, POST /api/stream/focus) so two collaborators see Topic activity, agent-job state, and presence on the same Document within ~1 s.
Architecture: New internal/realtime package holds a goroutine-safe hub keyed by source_path. The SSE handler subscribes a client, writes a one-field subscribed handshake frame, activates the subscription for fanout, then streams events. Existing mutation handlers and the agent runner call Hub.Publish after their SQLite write commits — events are advisory; REST endpoints remain authoritative. No SQL migration.
Tech Stack: Go 1.x stdlib (net/http, http.Flusher, sync.RWMutex, channels, context), existing internal/auth, internal/collab, internal/agent packages. modernc.org/sqlite driver (unchanged).
Spec: ../specs/2026-05-13-realtime-collab-design.html. Decisions log: ../specs/2026-05-10-collaborative-annotations-decisions.md.
New:
internal/realtime/hub.go — Hub, Subscriber, Event, Principal, ErrHubClosed. Goroutine-safe; ~250 LoC.internal/realtime/hub_test.go — unit tests for fanout, slow-consumer drop, no cross-source leak, Subscribe/Activate split, presence recompute, close semantics.internal/server/handler_stream.go — handleStream (SSE) and handleStreamFocus (POST). ~200 LoC.internal/server/handler_stream_test.go — integration tests with httptest.Server and a small SSE reader helper.internal/server/e2e_realtime_test.go — multi-client end-to-end: two simulated browsers, mutations propagate as expected events in the right order.Modified:
internal/server/server.go — new routes; Realtime added to Deps.internal/server/topics.go — handleCreateTopic, handleAppendTopicMessage call Realtime.Publish after the SQLite write commits.internal/server/handler_proposals.go — handleDiscardTopic, handleIncorporate call Realtime.Publish after the SQLite write commits.internal/agent/service.go — after CompleteJob, publish topic.message_appended → proposal.created → job.updated for successful incorporate jobs; just job.updated for failures.internal/agent/service.go constructor + cmd/wiki-browser/main.go — pass the hub into the service.cmd/wiki-browser/main.go — construct the hub before the server mux; close it during shutdown.Deliberately not changed in this plan (owned by #8):
internal/server/static/chrome.js — client-side wiring lives with the UI integration sub-project.internal/server/templates/shell.html — no chrome changes./home/volrath/code/orcha/wiki-browser/. Run go test, go vet, make build from that dir.go test ./internal/realtime/... ./internal/server/... ./internal/agent/... runs the relevant slice.git log --oneline -5):
wiki-browser: realtime — <thing> for hub work.wiki-browser: server — <thing> for HTTP handler work.wiki-browser: agent — <thing> for runner work.newTestServer in internal/server/handler_doc_test.go already provisions a temp collab.Store with daniel@getorcha.com seeded into users. Use it.Event, Principal, ErrHubClosedFiles:
internal/realtime/hub.gointernal/realtime/hub_test.goThis task establishes the types the rest of the plan refers to. No behaviour yet — just enough surface for a compile-time check.
// internal/realtime/hub_test.go
package realtime_test
import (
"errors"
"testing"
"github.com/getorcha/wiki-browser/internal/realtime"
)
func TestErrHubClosed_isSentinel(t *testing.T) {
if !errors.Is(realtime.ErrHubClosed, realtime.ErrHubClosed) {
t.Fatal("ErrHubClosed must satisfy errors.Is against itself")
}
if realtime.ErrHubClosed.Error() == "" {
t.Fatal("ErrHubClosed must have a non-empty message")
}
}
func TestEvent_zeroValueCompiles(t *testing.T) {
var e realtime.Event
_ = e.Type
_ = e.Payload
}
func TestPrincipal_zeroValueCompiles(t *testing.T) {
var p realtime.Principal
_ = p.UserID
_ = p.DisplayName
_ = p.SessionID
}
go test ./internal/realtime/ -run TestErrHubClosed -v
Expected: FAIL with no Go files in internal/realtime (the package doesn't exist yet).
// internal/realtime/hub.go
package realtime
import "errors"
// Event is a single SSE frame published to subscribers. Payload must marshal
// to JSON; the SSE handler encodes it inline.
type Event struct {
Type string
Payload any
}
// Principal carries the identity of the SSE-stream consumer. Lives here (not
// imported from internal/auth) so the realtime package stays independent of
// auth's package shape. The SSE handler maps from auth.Principal +
// auth.SessionInfo into this struct on subscribe.
type Principal struct {
UserID string
DisplayName string
SessionID string
}
// ErrHubClosed is returned by Hub.Subscribe after Hub.Close has been called.
var ErrHubClosed = errors.New("realtime: hub closed")
go test ./internal/realtime/ -v
Expected: PASS (three tests).
git add internal/realtime/
git commit -m "wiki-browser: realtime — package skeleton (Event, Principal, ErrHubClosed)"
Hub with Subscribe / Activate / Publish and one-way fanoutThe Subscribe/Activate split is load-bearing: it guarantees the SSE handler can write the subscribed handshake frame before any other event reaches the channel.
Files:
Modify: internal/realtime/hub.go
Modify: internal/realtime/hub_test.go
Step 1: Write the failing test
Append to internal/realtime/hub_test.go:
import (
// ... keep existing imports ...
"sync"
"time"
)
func newPrincipal(userID, session string) realtime.Principal {
return realtime.Principal{UserID: userID, DisplayName: userID, SessionID: session}
}
func drain(t *testing.T, ch <-chan realtime.Event, want int, timeout time.Duration) []realtime.Event {
t.Helper()
got := make([]realtime.Event, 0, want)
deadline := time.After(timeout)
for len(got) < want {
select {
case ev, ok := <-ch:
if !ok {
return got
}
got = append(got, ev)
case <-deadline:
t.Fatalf("timed out waiting for %d events, got %d: %+v", want, len(got), got)
}
}
return got
}
func TestHub_PublishReachesActivatedSubscriber(t *testing.T) {
hub := realtime.NewHub()
defer hub.Close()
sub, err := hub.Subscribe("a.md", newPrincipal("u1", "s1"))
if err != nil {
t.Fatal(err)
}
hub.Activate(sub)
hub.Publish("a.md", realtime.Event{Type: "topic.created", Payload: map[string]string{"topic_id": "t1"}})
evts := drain(t, sub.Events(), 1, 100*time.Millisecond)
if evts[0].Type != "topic.created" {
t.Fatalf("got %q, want topic.created", evts[0].Type)
}
}
func TestHub_PublishBeforeActivateIsDropped(t *testing.T) {
hub := realtime.NewHub()
defer hub.Close()
sub, _ := hub.Subscribe("a.md", newPrincipal("u1", "s1"))
hub.Publish("a.md", realtime.Event{Type: "topic.created"})
// No Activate — the event must not be in the channel.
select {
case ev := <-sub.Events():
t.Fatalf("unexpected event before Activate: %+v", ev)
case <-time.After(20 * time.Millisecond):
// good
}
}
func TestHub_NoCrossSourceLeak(t *testing.T) {
hub := realtime.NewHub()
defer hub.Close()
a, _ := hub.Subscribe("a.md", newPrincipal("u1", "s1"))
b, _ := hub.Subscribe("b.md", newPrincipal("u2", "s2"))
hub.Activate(a)
hub.Activate(b)
hub.Publish("a.md", realtime.Event{Type: "topic.created"})
got := drain(t, a.Events(), 1, 100*time.Millisecond)
if got[0].Type != "topic.created" {
t.Fatalf("a got %q", got[0].Type)
}
select {
case ev := <-b.Events():
t.Fatalf("b received cross-source event: %+v", ev)
case <-time.After(20 * time.Millisecond):
}
}
func TestHub_FanoutToMultipleSubscribersOnSameSource(t *testing.T) {
hub := realtime.NewHub()
defer hub.Close()
a, _ := hub.Subscribe("a.md", newPrincipal("u1", "s1"))
b, _ := hub.Subscribe("a.md", newPrincipal("u2", "s2"))
hub.Activate(a)
hub.Activate(b)
hub.Publish("a.md", realtime.Event{Type: "topic.created"})
var wg sync.WaitGroup
for _, s := range []*realtime.Subscriber{a, b} {
wg.Add(1)
go func(s *realtime.Subscriber) {
defer wg.Done()
drain(t, s.Events(), 1, 100*time.Millisecond)
}(s)
}
wg.Wait()
}
func TestHub_SubscribeAfterCloseFails(t *testing.T) {
hub := realtime.NewHub()
hub.Close()
if _, err := hub.Subscribe("a.md", newPrincipal("u1", "s1")); err == nil || err.Error() != realtime.ErrHubClosed.Error() {
t.Fatalf("Subscribe after Close: err=%v, want ErrHubClosed", err)
}
}
go test ./internal/realtime/ -v
Expected: build failure (undefined: realtime.NewHub, realtime.Subscriber, etc.).
Replace internal/realtime/hub.go with:
// internal/realtime/hub.go
package realtime
import (
"errors"
"sync"
"github.com/google/uuid"
)
// Event is a single SSE frame published to subscribers.
type Event struct {
Type string
Payload any
}
// Principal carries the identity of the SSE-stream consumer.
type Principal struct {
UserID string
DisplayName string
SessionID string
}
// ErrHubClosed is returned by Subscribe after Close has been called.
var ErrHubClosed = errors.New("realtime: hub closed")
// subscriberBufferSize is the per-subscriber event channel capacity. See the
// "Backpressure" section of the design spec for the rationale (64 absorbs the
// 3-event burst of an Agent incorporation plus prior running transitions and
// presence churn, with ample headroom; 16 KiB per subscriber stays trivial on
// the Pi).
const subscriberBufferSize = 64
// Hub is the in-process pub/sub fanout. Goroutine-safe. Keyed by source_path.
type Hub struct {
mu sync.RWMutex
closed bool
sources map[string]map[*Subscriber]struct{}
}
// Subscriber represents one live SSE stream. Owned by the calling handler;
// must be closed when the handler returns.
type Subscriber struct {
id string
source string
princ Principal
mu sync.Mutex
focusedTopicID string
events chan Event
closed bool
activated bool
hub *Hub
}
func NewHub() *Hub {
return &Hub{sources: map[string]map[*Subscriber]struct{}{}}
}
// Subscribe allocates a Subscriber for sourcePath but does NOT register it for
// fanout. The caller writes a `subscribed` handshake frame onto the SSE
// stream, then calls Activate. Splitting the two operations guarantees
// `subscribed` is the first frame on the wire — concurrent Publish calls on
// the same source cannot deliver into this Subscriber's channel until
// Activate runs.
func (h *Hub) Subscribe(sourcePath string, p Principal) (*Subscriber, error) {
h.mu.RLock()
closed := h.closed
h.mu.RUnlock()
if closed {
return nil, ErrHubClosed
}
return &Subscriber{
id: uuid.NewString(),
source: sourcePath,
princ: p,
events: make(chan Event, subscriberBufferSize),
hub: h,
}, nil
}
// Activate joins the subscriber to the source's fanout set.
func (h *Hub) Activate(s *Subscriber) {
h.mu.Lock()
defer h.mu.Unlock()
if h.closed {
return
}
s.mu.Lock()
s.activated = true
s.mu.Unlock()
set := h.sources[s.source]
if set == nil {
set = map[*Subscriber]struct{}{}
h.sources[s.source] = set
}
set[s] = struct{}{}
}
// Publish fans an event to every activated subscriber on sourcePath. Non-
// blocking: if a subscriber's buffer is full, the hub asynchronously closes
// that subscriber and the SSE handler will return + the client will reconnect
// + refetch via REST.
func (h *Hub) Publish(sourcePath string, evt Event) {
h.mu.RLock()
set := h.sources[sourcePath]
subs := make([]*Subscriber, 0, len(set))
for s := range set {
subs = append(subs, s)
}
h.mu.RUnlock()
for _, s := range subs {
s.deliver(evt)
}
}
// Close shuts down the hub. After Close, Subscribe returns ErrHubClosed.
// Existing subscribers' channels are closed.
func (h *Hub) Close() {
h.mu.Lock()
if h.closed {
h.mu.Unlock()
return
}
h.closed = true
subs := []*Subscriber{}
for _, set := range h.sources {
for s := range set {
subs = append(subs, s)
}
}
h.sources = map[string]map[*Subscriber]struct{}{}
h.mu.Unlock()
for _, s := range subs {
s.closeChan()
}
}
// ID returns the server-generated subscriber_id sent in the `subscribed` event.
func (s *Subscriber) ID() string { return s.id }
// Source returns the source_path this subscription is bound to.
func (s *Subscriber) Source() string { return s.source }
// Principal returns the bound Principal (read-only).
func (s *Subscriber) Principal() Principal { return s.princ }
// Events is the read-only stream the SSE handler ranges over.
func (s *Subscriber) Events() <-chan Event { return s.events }
// Close removes the subscriber from the hub's fanout set and closes its
// channel. Idempotent. Safe to call from the SSE handler's defer.
func (s *Subscriber) Close() {
s.hub.removeSubscriber(s)
}
func (s *Subscriber) deliver(evt Event) {
s.mu.Lock()
if s.closed {
s.mu.Unlock()
return
}
select {
case s.events <- evt:
s.mu.Unlock()
default:
// Buffer full → slow consumer. Close and let the SSE handler exit;
// the client reconnects + refetches via REST.
s.mu.Unlock()
go s.hub.removeSubscriber(s)
}
}
func (s *Subscriber) closeChan() {
s.mu.Lock()
if s.closed {
s.mu.Unlock()
return
}
s.closed = true
close(s.events)
s.mu.Unlock()
}
func (h *Hub) removeSubscriber(s *Subscriber) {
h.mu.Lock()
if set, ok := h.sources[s.source]; ok {
delete(set, s)
if len(set) == 0 {
delete(h.sources, s.source)
}
}
h.mu.Unlock()
s.closeChan()
}
go test ./internal/realtime/ -v
Expected: all tests pass.
git add internal/realtime/
git commit -m "wiki-browser: realtime — hub with Subscribe/Activate fanout"
Files:
internal/realtime/hub_test.goThe hub already implements drop-on-full in Task 2's deliver. This task tests the contract explicitly.
Append to internal/realtime/hub_test.go:
func TestHub_SlowConsumerDropped(t *testing.T) {
hub := realtime.NewHub()
defer hub.Close()
sub, _ := hub.Subscribe("a.md", newPrincipal("u1", "s1"))
hub.Activate(sub)
// Fill the buffer (64) without reading.
for i := 0; i < 64; i++ {
hub.Publish("a.md", realtime.Event{Type: "x"})
}
// One more should trigger backpressure → close.
hub.Publish("a.md", realtime.Event{Type: "overflow"})
// Drain whatever fit, then verify the channel closes.
deadline := time.After(200 * time.Millisecond)
gotClose := false
for !gotClose {
select {
case _, ok := <-sub.Events():
if !ok {
gotClose = true
}
case <-deadline:
t.Fatal("subscriber channel never closed after overflow")
}
}
}
func TestHub_PublishNeverBlocks(t *testing.T) {
hub := realtime.NewHub()
defer hub.Close()
sub, _ := hub.Subscribe("a.md", newPrincipal("u1", "s1"))
hub.Activate(sub)
done := make(chan struct{})
go func() {
for i := 0; i < 1000; i++ {
hub.Publish("a.md", realtime.Event{Type: "x"})
}
close(done)
}()
select {
case <-done:
case <-time.After(500 * time.Millisecond):
t.Fatal("Publish blocked on a slow subscriber")
}
}
go test ./internal/realtime/ -run 'TestHub_(SlowConsumerDropped|PublishNeverBlocks)' -v
Expected: both pass (impl from Task 2 already satisfies them). If either fails, fix the implementation before continuing — backpressure is correctness, not nice-to-have.
git add internal/realtime/hub_test.go
git commit -m "wiki-browser: realtime — backpressure tests"
SetFocus and presence recompute on subscribe/activate/closeFiles:
internal/realtime/hub.gointernal/realtime/hub_test.goPresence is emitted as presence.updated events with payload {subscriptions: [...]} — one entry per active subscription on the source. The hub recomputes and republishes on every Activate, SetFocus, and removeSubscriber.
Append to hub_test.go:
type presencePayload struct {
Subscriptions []presenceEntry `json:"subscriptions"`
}
type presenceEntry struct {
SubscriberID string `json:"subscriber_id"`
UserID string `json:"user_id"`
DisplayName string `json:"display_name"`
FocusedTopicID string `json:"focused_topic_id,omitempty"`
}
func presenceFrom(t *testing.T, ev realtime.Event) presencePayload {
t.Helper()
if ev.Type != "presence.updated" {
t.Fatalf("not presence.updated: %s", ev.Type)
}
p, ok := ev.Payload.(realtime.PresencePayload)
if !ok {
t.Fatalf("payload type = %T, want realtime.PresencePayload", ev.Payload)
}
out := presencePayload{}
for _, s := range p.Subscriptions {
out.Subscriptions = append(out.Subscriptions, presenceEntry{
SubscriberID: s.SubscriberID, UserID: s.UserID,
DisplayName: s.DisplayName, FocusedTopicID: s.FocusedTopicID,
})
}
return out
}
func TestHub_PresenceEmittedOnActivate(t *testing.T) {
hub := realtime.NewHub()
defer hub.Close()
a, _ := hub.Subscribe("a.md", newPrincipal("daniel@x", "s1"))
hub.Activate(a)
evts := drain(t, a.Events(), 1, 100*time.Millisecond)
p := presenceFrom(t, evts[0])
if len(p.Subscriptions) != 1 || p.Subscriptions[0].UserID != "daniel@x" {
t.Fatalf("snapshot = %+v", p)
}
}
func TestHub_PresenceRecomputeOnSecondActivate(t *testing.T) {
hub := realtime.NewHub()
defer hub.Close()
a, _ := hub.Subscribe("a.md", newPrincipal("daniel@x", "s1"))
hub.Activate(a)
drain(t, a.Events(), 1, 100*time.Millisecond) // initial snapshot
b, _ := hub.Subscribe("a.md", newPrincipal("max@y", "s2"))
hub.Activate(b)
// Both subscribers receive an updated snapshot.
got := drain(t, a.Events(), 1, 100*time.Millisecond)
pA := presenceFrom(t, got[0])
if len(pA.Subscriptions) != 2 {
t.Fatalf("after second Activate, a saw %d subs", len(pA.Subscriptions))
}
got = drain(t, b.Events(), 1, 100*time.Millisecond)
pB := presenceFrom(t, got[0])
if len(pB.Subscriptions) != 2 {
t.Fatalf("after second Activate, b saw %d subs", len(pB.Subscriptions))
}
}
func TestHub_SetFocusRepublishesPresence(t *testing.T) {
hub := realtime.NewHub()
defer hub.Close()
a, _ := hub.Subscribe("a.md", newPrincipal("daniel@x", "s1"))
b, _ := hub.Subscribe("a.md", newPrincipal("max@y", "s2"))
hub.Activate(a)
hub.Activate(b)
// Drain the two initial snapshots from both subscribers.
drain(t, a.Events(), 2, 100*time.Millisecond)
drain(t, b.Events(), 1, 100*time.Millisecond)
a.SetFocus("topic-123")
got := drain(t, b.Events(), 1, 100*time.Millisecond)
p := presenceFrom(t, got[0])
var aEntry presenceEntry
for _, e := range p.Subscriptions {
if e.UserID == "daniel@x" {
aEntry = e
}
}
if aEntry.FocusedTopicID != "topic-123" {
t.Fatalf("after SetFocus, daniel's focus = %q, want topic-123", aEntry.FocusedTopicID)
}
}
func TestHub_RemoveSubscriberRepublishesPresence(t *testing.T) {
hub := realtime.NewHub()
defer hub.Close()
a, _ := hub.Subscribe("a.md", newPrincipal("daniel@x", "s1"))
b, _ := hub.Subscribe("a.md", newPrincipal("max@y", "s2"))
hub.Activate(a)
hub.Activate(b)
drain(t, a.Events(), 2, 100*time.Millisecond)
drain(t, b.Events(), 1, 100*time.Millisecond)
a.Close()
got := drain(t, b.Events(), 1, 100*time.Millisecond)
p := presenceFrom(t, got[0])
if len(p.Subscriptions) != 1 || p.Subscriptions[0].UserID != "max@y" {
t.Fatalf("after a.Close, b saw %+v", p.Subscriptions)
}
}
go test ./internal/realtime/ -v
Expected: build failure (undefined: realtime.PresencePayload, (*Subscriber).SetFocus).
Add to internal/realtime/hub.go:
// PresencePayload is the JSON-marshalable payload of presence.updated events.
// Each entry corresponds to one live subscription — two tabs from the same
// user produce two entries with the same user_id.
type PresencePayload struct {
Subscriptions []PresenceEntry `json:"subscriptions"`
}
type PresenceEntry struct {
SubscriberID string `json:"subscriber_id"`
UserID string `json:"user_id"`
DisplayName string `json:"display_name"`
FocusedTopicID string `json:"focused_topic_id,omitempty"`
}
Modify (*Hub).Activate to call presence recompute at the end:
func (h *Hub) Activate(s *Subscriber) {
h.mu.Lock()
if h.closed {
h.mu.Unlock()
return
}
s.mu.Lock()
s.activated = true
s.mu.Unlock()
set := h.sources[s.source]
if set == nil {
set = map[*Subscriber]struct{}{}
h.sources[s.source] = set
}
set[s] = struct{}{}
h.mu.Unlock()
h.publishPresence(s.source)
}
Modify (*Hub).removeSubscriber to publish after removal:
func (h *Hub) removeSubscriber(s *Subscriber) {
h.mu.Lock()
source := s.source
if set, ok := h.sources[source]; ok {
delete(set, s)
if len(set) == 0 {
delete(h.sources, source)
}
}
h.mu.Unlock()
s.closeChan()
h.publishPresence(source)
}
Add SetFocus to Subscriber and publishPresence to Hub:
// SetFocus updates the subscription's focused Topic and triggers a presence
// republish on this source.
func (s *Subscriber) SetFocus(topicID string) {
s.mu.Lock()
s.focusedTopicID = topicID
activated := s.activated
s.mu.Unlock()
if activated {
s.hub.publishPresence(s.source)
}
}
func (h *Hub) publishPresence(sourcePath string) {
h.mu.RLock()
set := h.sources[sourcePath]
entries := make([]PresenceEntry, 0, len(set))
subs := make([]*Subscriber, 0, len(set))
for s := range set {
s.mu.Lock()
entries = append(entries, PresenceEntry{
SubscriberID: s.id,
UserID: s.princ.UserID,
DisplayName: s.princ.DisplayName,
FocusedTopicID: s.focusedTopicID,
})
s.mu.Unlock()
subs = append(subs, s)
}
h.mu.RUnlock()
evt := Event{Type: "presence.updated", Payload: PresencePayload{Subscriptions: entries}}
for _, s := range subs {
s.deliver(evt)
}
}
NOTE on ordering. The presence entries' order is unstable (map iteration). Tests must not rely on order — they filter by
UserID. If a deterministic order is ever needed for the wire, sort bySubscriberIDinsidepublishPresence— but v1 does not require it.
go test ./internal/realtime/ -v
Expected: all pass.
git add internal/realtime/
git commit -m "wiki-browser: realtime — presence model (per-subscription, recomputed)"
Realtime into Deps, cmd/wiki-browser/main.go, and agent.ServiceThis task gets the hub instantiated and threaded through the dependency graph so subsequent tasks can use it. Smoke test: the existing test suite still passes.
Files:
Modify: internal/server/server.go
Modify: internal/server/handler_doc_test.go
Modify: internal/agent/service.go
Modify: cmd/wiki-browser/main.go
Step 1: Write the failing test
Append to internal/server/handler_doc_test.go:
import (
// existing imports …
"github.com/getorcha/wiki-browser/internal/realtime"
)
func TestNewTestServer_acceptsRealtimeHub(t *testing.T) {
// Smoke check that passing a hub through Deps compiles and the existing
// routes still respond. This is a regression guard for the wiring change.
hub := realtime.NewHub()
t.Cleanup(hub.Close)
ts, _, _ := newTestServerWithOptions(t, testServerOptions{
SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
Realtime: hub,
})
resp, err := http.Get(ts.URL + "/")
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
t.Fatalf("status = %d", resp.StatusCode)
}
}
go test ./internal/server/ -run TestNewTestServer_acceptsRealtimeHub -v
Expected: build failure (testServerOptions.Realtime undefined, Deps.Realtime undefined).
Realtime to Deps in internal/server/server.goModify the Deps struct (around line 14):
type Deps struct {
Title string
Root string
Walker *walker.Walker
Index *index.Index
Cache *render.Cache
Collab *collab.Store
Auth *auth.Handlers
SessionMiddleware func(http.Handler) http.Handler
AuthDevMode bool
AgentService AgentService
// Realtime is the in-process pub/sub hub for SSE. Nil disables
// /api/stream + /api/stream/focus with 503.
Realtime *realtime.Hub
}
Add the internal/realtime import to the imports block.
Realtime to testServerOptions and pass it through MuxIn internal/server/handler_doc_test.go, extend testServerOptions:
type testServerOptions struct {
SessionMiddleware func(http.Handler) http.Handler
AgentService server.AgentService
Realtime *realtime.Hub
}
Pass it into server.Mux(...):
mux := server.Mux(server.Deps{
// … existing fields …
Realtime: opts.Realtime,
})
agent.ServiceConfig and agent.ServiceIn internal/agent/service.go, extend ServiceConfig:
type ServiceConfig struct {
// … existing fields …
// Realtime is the hub the runner publishes job + proposal events on.
// Nil disables event publishing (existing tests don't need it; production
// always passes one).
Realtime *realtime.Hub
}
Add internal/realtime import. Add a helper:
func (s *Service) publish(sourcePath string, evt realtime.Event) {
if s.cfg.Realtime == nil {
return
}
s.cfg.Realtime.Publish(sourcePath, evt)
}
(Tasks 9–11 will call this from (*Service).run.)
cmd/wiki-browser/main.goAfter collabStore is opened (around line 116) and before the agent service is constructed:
hub := realtime.NewHub()
defer hub.Close()
Defer-order note.
run()returns throughsrv.Shutdown(...)in the signal-handler branch, then unwinds its deferred functions in LIFO order before returning tomain. The hub must shut down afterShutdowndrains in-flight SSE handlers — otherwise the handlers'defer sub.Close()would race a hub that's already torn down. Placingdefer hub.Close()betweencollabStoreopen andagentSvcconstruction makes it later (closer to the top) thandefer agentSvc.Stop(); LIFO meansagentSvc.Stop()fires first (drains in-flight agent jobs that may still callHub.Publish), thenhub.Close(). That's the right order. Don't introduce adefer srv.Shutdown(...)— the existing select-branch path already calls it.
Add the import. Pass Realtime: hub into agent.NewService(agent.ServiceConfig{ ... }) and server.Mux(server.Deps{ ... }).
go test ./internal/server/ ./internal/agent/ ./internal/realtime/ -v
go vet ./...
Expected: all pass, no vet warnings.
make build
Expected: builds dist/wiki-browser and dist/wb-agent without errors.
git add internal/server/server.go internal/server/handler_doc_test.go internal/agent/service.go cmd/wiki-browser/main.go
git commit -m "wiki-browser: wire realtime.Hub into Deps, agent.Service, main"
GET /api/stream — happy pathImplement the SSE handler's happy path: validate source path, validate file exists, subscribe, write subscribed frame, activate, fan out events. Defer keepalive and session-touch handling to the next task.
Files:
Create: internal/server/handler_stream.go
Create: internal/server/handler_stream_test.go
Modify: internal/server/server.go (register route)
Step 1: Write the failing test
Create internal/server/handler_stream_test.go:
package server_test
import (
"bufio"
"encoding/json"
"net/http"
"strings"
"testing"
"time"
"github.com/getorcha/wiki-browser/internal/realtime"
)
// sseEvent is one parsed SSE frame (event: + data: lines).
type sseEvent struct {
Type string
Data string
}
// openSSE opens the stream and returns a channel of parsed events plus a
// cancel func. The channel closes when the response body closes.
func openSSE(t *testing.T, url string) (<-chan sseEvent, func()) {
t.Helper()
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
t.Fatal(err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
}
if resp.StatusCode != http.StatusOK {
body, _ := bufio.NewReader(resp.Body).ReadString('\n')
resp.Body.Close()
t.Fatalf("stream status = %d body=%q", resp.StatusCode, body)
}
out := make(chan sseEvent, 32)
go func() {
defer close(out)
// Body close is owned by the returned cancel func — closing here
// would double-Close.
scanner := bufio.NewScanner(resp.Body)
scanner.Buffer(make([]byte, 64<<10), 1<<20)
var ev sseEvent
for scanner.Scan() {
line := scanner.Text()
switch {
case line == "":
if ev.Type != "" {
out <- ev
ev = sseEvent{}
}
case strings.HasPrefix(line, "event: "):
ev.Type = strings.TrimPrefix(line, "event: ")
case strings.HasPrefix(line, "data: "):
ev.Data = strings.TrimPrefix(line, "data: ")
case strings.HasPrefix(line, ":"):
// comment — keepalive, ignored
}
}
}()
return out, func() { _ = resp.Body.Close() }
}
func waitForEvent(t *testing.T, ch <-chan sseEvent, wantType string, timeout time.Duration) sseEvent {
t.Helper()
deadline := time.After(timeout)
for {
select {
case ev, ok := <-ch:
if !ok {
t.Fatalf("stream closed waiting for %s", wantType)
}
if ev.Type == wantType {
return ev
}
case <-deadline:
t.Fatalf("timeout waiting for %s", wantType)
}
}
}
func TestStream_subscribedFrameFirstThenPresence(t *testing.T) {
hub := realtime.NewHub()
t.Cleanup(hub.Close)
ts, _, _ := newTestServerWithOptions(t, testServerOptions{
SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
Realtime: hub,
})
ch, cancel := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
defer cancel()
// First frame must be `subscribed`.
ev := <-ch
if ev.Type != "subscribed" {
t.Fatalf("first event = %q, want subscribed", ev.Type)
}
var sub struct {
SubscriberID string `json:"subscriber_id"`
}
if err := json.Unmarshal([]byte(ev.Data), &sub); err != nil {
t.Fatalf("decode subscribed: %v", err)
}
if sub.SubscriberID == "" {
t.Fatal("subscriber_id empty")
}
// Followed (in some order, but soon) by a presence.updated frame.
waitForEvent(t, ch, "presence.updated", 200*time.Millisecond)
}
func TestStream_badSourcePathReturns400(t *testing.T) {
hub := realtime.NewHub()
t.Cleanup(hub.Close)
ts, _, _ := newTestServerWithOptions(t, testServerOptions{
SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
Realtime: hub,
})
resp, err := http.Get(ts.URL + "/api/stream?source_path=../escape.md")
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusBadRequest {
t.Fatalf("status = %d, want 400", resp.StatusCode)
}
}
func TestStream_missingFileReturns404(t *testing.T) {
hub := realtime.NewHub()
t.Cleanup(hub.Close)
ts, _, _ := newTestServerWithOptions(t, testServerOptions{
SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
Realtime: hub,
})
resp, err := http.Get(ts.URL + "/api/stream?source_path=does-not-exist.md")
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusNotFound {
t.Fatalf("status = %d, want 404", resp.StatusCode)
}
}
func TestStream_anonymousReturns401(t *testing.T) {
hub := realtime.NewHub()
t.Cleanup(hub.Close)
ts, _, _ := newTestServerWithOptions(t, testServerOptions{
SessionMiddleware: nil,
Realtime: hub,
})
resp, err := http.Get(ts.URL + "/api/stream?source_path=a.md")
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusUnauthorized {
t.Fatalf("status = %d, want 401", resp.StatusCode)
}
}
func TestStream_realtimeNilReturns503(t *testing.T) {
ts, _, _ := newTestServerWithOptions(t, testServerOptions{
SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
Realtime: nil,
})
resp, err := http.Get(ts.URL + "/api/stream?source_path=a.md")
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusServiceUnavailable {
t.Fatalf("status = %d, want 503", resp.StatusCode)
}
}
go test ./internal/server/ -run TestStream -v
Expected: 404 on /api/stream (route not registered).
Create internal/server/handler_stream.go:
package server
import (
"encoding/json"
"fmt"
"net/http"
"os"
"path/filepath"
"github.com/getorcha/wiki-browser/internal/auth"
"github.com/getorcha/wiki-browser/internal/collab"
"github.com/getorcha/wiki-browser/internal/realtime"
)
// handleStream implements GET /api/stream?source_path=…
//
// SSE protocol: one HTTP response, kept open. Frames are
//
// event: <type>
// data: <JSON payload>
//
// followed by a blank line. Keepalive uses `:keepalive` comment lines. The
// stream never emits `id:` lines — v1 has no replay log.
func (d Deps) handleStream(w http.ResponseWriter, r *http.Request) {
if d.Realtime == nil || d.Collab == nil {
writeJSONError(w, http.StatusServiceUnavailable, "collab_unavailable")
return
}
sourcePath := r.URL.Query().Get("source_path")
if _, err := collab.ValidateSourcePath(sourcePath); err != nil {
writeJSONError(w, http.StatusBadRequest, "bad_source_path")
return
}
abs := filepath.Join(d.Root, sourcePath)
if info, err := os.Stat(abs); err != nil || info.IsDir() {
writeJSONError(w, http.StatusNotFound, "unknown_source")
return
}
flusher, ok := w.(http.Flusher)
if !ok {
writeJSONError(w, http.StatusInternalServerError, "no_flusher")
return
}
principal, _ := auth.PrincipalFrom(r.Context())
sessionInfo, _ := auth.SessionFrom(r.Context())
sub, err := d.Realtime.Subscribe(sourcePath, realtime.Principal{
UserID: principal.UserID,
DisplayName: principal.DisplayName,
SessionID: sessionInfo.IDHash,
})
if err != nil {
writeJSONError(w, http.StatusServiceUnavailable, "hub_closed")
return
}
defer sub.Close()
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-store")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
w.WriteHeader(http.StatusOK)
// Handshake — `subscribed` must be the first frame on the wire. Write
// before Activate so Publish-from-elsewhere cannot race in ahead of it.
if err := writeSSEEvent(w, "subscribed", map[string]string{"subscriber_id": sub.ID()}); err != nil {
return
}
flusher.Flush()
d.Realtime.Activate(sub)
ctx := r.Context()
for {
select {
case <-ctx.Done():
return
case ev, ok := <-sub.Events():
if !ok {
return
}
if err := writeSSEEvent(w, ev.Type, ev.Payload); err != nil {
return
}
flusher.Flush()
}
}
}
// writeSSEEvent serialises one SSE frame. `event:` + `data:` only, never `id:`
// — the spec forbids `id:` because v1 has no replay log.
func writeSSEEvent(w http.ResponseWriter, evType string, payload any) error {
if _, err := fmt.Fprintf(w, "event: %s\n", evType); err != nil {
return err
}
b, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshal event %s: %w", evType, err)
}
if _, err := fmt.Fprintf(w, "data: %s\n\n", string(b)); err != nil {
return err
}
return nil
}
internal/server/server.goIn Mux(...), after the /api/agent/jobs/{id} registration:
mux.Handle("GET /api/stream",
d.withSession(auth.RequireCollaborator(http.HandlerFunc(d.handleStream))))
go test ./internal/server/ -run TestStream -v
Expected: all pass.
git add internal/server/handler_stream.go internal/server/handler_stream_test.go internal/server/server.go
git commit -m "wiki-browser: server — GET /api/stream SSE handler (happy path)"
The handler must (a) emit a :keepalive comment every 15 s, (b) close the stream when the session is revoked / expired, (c) call TouchSession to extend the sliding expiry so SSE-only observers don't time out.
To make this testable without a 15 s wait, the keepalive interval comes from Deps (config-driven default, test-time override).
Files:
Modify: internal/server/server.go
Modify: internal/server/handler_stream.go
Modify: internal/server/handler_stream_test.go
Modify: internal/server/handler_doc_test.go
Step 1: Write the failing test
Append to internal/server/handler_stream_test.go:
func TestStream_keepaliveCommentEmitted(t *testing.T) {
hub := realtime.NewHub()
t.Cleanup(hub.Close)
ts, _, _ := newTestServerWithOptions(t, testServerOptions{
SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
Realtime: hub,
StreamKeepalive: 30 * time.Millisecond,
})
req, _ := http.NewRequest(http.MethodGet, ts.URL+"/api/stream?source_path=a.md", nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
scanner.Buffer(make([]byte, 64<<10), 1<<20)
deadline := time.Now().Add(500 * time.Millisecond)
sawKeepalive := false
for scanner.Scan() && time.Now().Before(deadline) {
if strings.HasPrefix(scanner.Text(), ":keepalive") {
sawKeepalive = true
break
}
}
if !sawKeepalive {
t.Fatal("no :keepalive line emitted within 500ms")
}
}
func TestStream_doesNotEmitIDLines(t *testing.T) {
// Reading first frame from the stream and asserting no `id:` line slipped
// in — protects against accidental Last-Event-ID handling.
hub := realtime.NewHub()
t.Cleanup(hub.Close)
ts, _, _ := newTestServerWithOptions(t, testServerOptions{
SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
Realtime: hub,
StreamKeepalive: 1 * time.Second,
})
req, _ := http.NewRequest(http.MethodGet, ts.URL+"/api/stream?source_path=a.md", nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
scanner := bufio.NewScanner(resp.Body)
scanner.Buffer(make([]byte, 64<<10), 1<<20)
deadline := time.Now().Add(200 * time.Millisecond)
for scanner.Scan() && time.Now().Before(deadline) {
line := scanner.Text()
if strings.HasPrefix(line, "id:") {
t.Fatalf("unexpected id: line: %q", line)
}
}
}
StreamKeepalive to testServerOptionsIn internal/server/handler_doc_test.go:
type testServerOptions struct {
SessionMiddleware func(http.Handler) http.Handler
AgentService server.AgentService
Realtime *realtime.Hub
StreamKeepalive time.Duration
}
Pass it into Deps:
mux := server.Mux(server.Deps{
// … existing fields …
StreamKeepalive: opts.StreamKeepalive,
})
Add "time" to imports.
StreamKeepalive to DepsIn internal/server/server.go, add to Deps:
// StreamKeepalive controls the SSE keepalive cadence. Zero means use the
// default (15s). Tests override to make assertions fast.
StreamKeepalive time.Duration
Add "time" to imports.
go test ./internal/server/ -run 'TestStream_keepalive|TestStream_doesNotEmitID' -v
Expected: tests fail (handler doesn't emit keepalive).
Replace the loop in internal/server/handler_stream.go:
import (
// existing imports …
"time"
)
const defaultKeepaliveInterval = 15 * time.Second
func (d Deps) handleStream(w http.ResponseWriter, r *http.Request) {
// … existing validation, subscribe, headers, subscribed frame, Activate …
keepaliveInterval := d.StreamKeepalive
if keepaliveInterval <= 0 {
keepaliveInterval = defaultKeepaliveInterval
}
ticker := time.NewTicker(keepaliveInterval)
defer ticker.Stop()
ctx := r.Context()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if _, err := fmt.Fprint(w, ":keepalive\n\n"); err != nil {
return
}
flusher.Flush()
case ev, ok := <-sub.Events():
if !ok {
return
}
if err := writeSSEEvent(w, ev.Type, ev.Payload); err != nil {
return
}
flusher.Flush()
}
}
}
go test ./internal/server/ -run 'TestStream' -v
Expected: keepalive + no-id tests pass; pre-existing TestStream tests still pass.
git add internal/server/server.go internal/server/handler_stream.go internal/server/handler_stream_test.go internal/server/handler_doc_test.go
git commit -m "wiki-browser: server — SSE keepalive ticker"
last_seen_at; revoked/expired sessions close the streamThis adds the H5 fix from the code review: a long-lived SSE connection participates in the sliding-session expiry that auth.SessionMiddleware already implements per-request.
Files:
internal/server/server.gointernal/server/handler_stream.gointernal/server/handler_stream_test.gointernal/server/handler_doc_test.goThe handler needs two things from the surrounding wiring:
auth.SessionStore — LookupSession, TouchSession. Already on collab.Store.TouchSession can compute the new expires_at).No clock injection in v1: tests use a short StreamKeepalive (e.g. 30 ms) and read real wall-clock time inside the handler. If a future test needs deterministic timing for the touch-throttle logic, plumb a NowFunc func() time.Time through Deps then — it's a one-field addition.
Append to internal/server/handler_stream_test.go:
import (
// existing imports …
"github.com/getorcha/wiki-browser/internal/auth"
"github.com/getorcha/wiki-browser/internal/collab"
)
// recordingSessionStore wraps a collab.Store and counts TouchSession calls.
// Per-instance state — never package-level — so each test is isolated.
type recordingSessionStore struct {
inner *collab.Store
mu sync.Mutex
touches int
}
func (r *recordingSessionStore) LookupSession(idHash string, now time.Time) (collab.Session, bool, error) {
return r.inner.LookupSession(idHash, now)
}
func (r *recordingSessionStore) TouchSession(idHash string, seenAt, expiresAt time.Time) error {
r.mu.Lock()
r.touches++
r.mu.Unlock()
return r.inner.TouchSession(idHash, seenAt, expiresAt)
}
func (r *recordingSessionStore) RevokeSession(idHash string, revokedAt time.Time) error {
return r.inner.RevokeSession(idHash, revokedAt)
}
func (r *recordingSessionStore) RotateSessionCSRF(idHash, csrfHash string) error {
return r.inner.RotateSessionCSRF(idHash, csrfHash)
}
func (r *recordingSessionStore) Touches() int {
r.mu.Lock()
defer r.mu.Unlock()
return r.touches
}
func TestStream_keepaliveTouchesSession(t *testing.T) {
hub := realtime.NewHub()
t.Cleanup(hub.Close)
// newTestServerWithOptions constructs the recordingSessionStore inside so
// we get it back as the third return value; this avoids any package-level
// state that would leak across tests in the same process.
ts, _, store, rec := newTestServerWithOptionsAndRecordingStore(t, testServerOptions{
Realtime: hub,
StreamKeepalive: 30 * time.Millisecond,
StreamTouchInterval: 0, // touch every tick
SessionLifetime: time.Hour,
}, "daniel@getorcha.com")
_ = store
req, _ := http.NewRequest(http.MethodGet, ts.URL+"/api/stream?source_path=a.md", nil)
req.AddCookie(&http.Cookie{Name: auth.SessionCookieName, Value: "test-session-token"})
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
// Poll the touch counter rather than sleeping a fixed interval.
deadline := time.Now().Add(500 * time.Millisecond)
for time.Now().Before(deadline) {
if rec.Touches() >= 1 {
return
}
time.Sleep(20 * time.Millisecond)
}
t.Fatalf("TouchSession not called within 500ms (touches=%d)", rec.Touches())
}
func TestStream_revokedSessionClosesStream(t *testing.T) {
hub := realtime.NewHub()
t.Cleanup(hub.Close)
ts, _, store, _ := newTestServerWithOptionsAndRecordingStore(t, testServerOptions{
Realtime: hub,
StreamKeepalive: 30 * time.Millisecond,
SessionLifetime: time.Hour,
StreamTouchInterval: time.Second, // don't touch every tick
}, "daniel@getorcha.com")
req, _ := http.NewRequest(http.MethodGet, ts.URL+"/api/stream?source_path=a.md", nil)
req.AddCookie(&http.Cookie{Name: auth.SessionCookieName, Value: "test-session-token"})
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
// Revoke the session in the DB.
if err := store.RevokeSession(auth.TokenHash("test-session-token"), time.Now()); err != nil {
t.Fatal(err)
}
// Read the response body in a goroutine that closes a channel on EOF.
// bufio.Scanner blocks on Scan(); a wall-clock loop around it does
// nothing once Scan is already blocked. The goroutine + select is the
// only deadline-aware pattern here.
closed := make(chan struct{})
go func() {
defer close(closed)
scanner := bufio.NewScanner(resp.Body)
scanner.Buffer(make([]byte, 64<<10), 1<<20)
for scanner.Scan() {
}
}()
select {
case <-closed:
// good — stream returned EOF
case <-time.After(500 * time.Millisecond):
t.Fatal("stream did not close within 500ms after session revocation")
}
}
internal/server/handler_doc_test.goimport (
// existing imports plus:
"os"
"path/filepath"
"sync"
"github.com/getorcha/wiki-browser/internal/auth"
"github.com/getorcha/wiki-browser/internal/index"
"github.com/getorcha/wiki-browser/internal/render"
"github.com/getorcha/wiki-browser/internal/server"
"github.com/getorcha/wiki-browser/internal/walker"
)
// realSessionMiddlewareForStreamTest builds a SessionMiddleware backed by the
// real store; useful for SSE tests that need to exercise revocation paths.
// Also seeds a session row keyed by "test-session-token" so the cookie path
// resolves.
func realSessionMiddlewareForStreamTest(t *testing.T, store *collab.Store, userID string) func(http.Handler) http.Handler {
t.Helper()
if err := store.CreateSession(collab.Session{
IDHash: auth.TokenHash("test-session-token"),
UserID: userID,
CSRFHash: auth.TokenHash("csrf"),
CreatedAt: time.Now(),
LastSeenAt: time.Now().Add(-time.Hour), // older than SessionTouchInterval → real middleware touches
ExpiresAt: time.Now().Add(time.Hour),
}); err != nil {
t.Fatal(err)
}
return auth.SessionMiddlewareWithCookieSecure(store, time.Now, time.Hour, false)
}
// newTestServerWithOptionsAndRecordingStore is the touch-test variant.
// Instead of reusing newTestServerWithOptions (which opens its own store),
// we build the fixture from scratch so the recordingSessionStore can wrap the
// same store the Deps see.
func newTestServerWithOptionsAndRecordingStore(t *testing.T, opts testServerOptions, userID string) (*httptest.Server, string, *collab.Store, *recordingSessionStore) {
t.Helper()
root := t.TempDir()
if err := os.WriteFile(filepath.Join(root, "a.md"), []byte("# A\n"), 0o644); err != nil {
t.Fatal(err)
}
w, err := walker.New(walker.Options{Root: root, Extensions: []string{".md"}})
if err != nil {
t.Fatal(err)
}
idx, err := index.Open(filepath.Join(t.TempDir(), "i.db"))
if err != nil {
t.Fatal(err)
}
idx.SetRoot(root)
cache := render.NewCache(4 << 20)
idx.SetCache(cache)
t.Cleanup(func() { idx.Close() })
store, err := collab.Open(collab.Config{Path: filepath.Join(t.TempDir(), "collab.db")})
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { _ = store.Close() })
if err := store.UpsertUser(collab.User{ID: userID, DisplayName: "Daniel"}); err != nil {
t.Fatal(err)
}
rec := &recordingSessionStore{inner: store}
opts.SessionMiddleware = realSessionMiddlewareForStreamTest(t, store, userID)
opts.SessionStore = rec
mux := server.Mux(server.Deps{
Title: "Test", Root: root, Walker: w, Index: idx, Cache: cache, Collab: store,
SessionMiddleware: opts.SessionMiddleware,
AgentService: opts.AgentService,
Realtime: opts.Realtime,
StreamKeepalive: opts.StreamKeepalive,
SessionStore: opts.SessionStore,
SessionLifetime: opts.SessionLifetime,
StreamTouchInterval: opts.StreamTouchInterval,
})
ts := httptest.NewServer(mux)
t.Cleanup(ts.Close)
return ts, root, store, rec
}
Have recordingSessionStore.TouchSession call incrementRecordedTouches() in addition to the inner store call.
testServerOptions and DepsIn internal/server/handler_doc_test.go:
type testServerOptions struct {
SessionMiddleware func(http.Handler) http.Handler
AgentService server.AgentService
Realtime *realtime.Hub
StreamKeepalive time.Duration
SessionStore auth.SessionStore
SessionLifetime time.Duration
StreamTouchInterval time.Duration
}
In server.go:
// SessionStore lets the stream handler revalidate the session on each
// keepalive tick. Required when StreamSessionMiddleware is non-nil.
SessionStore auth.SessionStore
// SessionLifetime is the rolling TTL used when the stream handler touches the
// session. Defaults to 30 days when zero (matching SessionMiddleware).
SessionLifetime time.Duration
// StreamTouchInterval throttles TouchSession calls on keepalive ticks.
// Default: auth.SessionTouchInterval (10 min). Tests override to verify the
// touch path.
StreamTouchInterval time.Duration
Pass SessionStore: store from cmd/wiki-browser/main.go (use the existing collabStore — it already satisfies auth.SessionStore).
go test ./internal/server/ -run 'TestStream_keepaliveTouchesSession|TestStream_revokedSessionClosesStream' -v
Expected: tests fail (handler ignores session state).
In internal/server/handler_stream.go, refactor the loop so each keepalive tick:
LookupSession(sessionInfo.IDHash, time.Now()) — if missing/expired, return.time.Since(session.LastSeenAt) >= StreamTouchInterval, call TouchSession.:keepalive and flush.func (d Deps) handleStream(w http.ResponseWriter, r *http.Request) {
// … existing validation, Subscribe, headers, subscribed frame, Activate …
keepaliveInterval := d.StreamKeepalive
if keepaliveInterval <= 0 {
keepaliveInterval = defaultKeepaliveInterval
}
touchInterval := d.StreamTouchInterval
if touchInterval < 0 {
touchInterval = 0
}
if d.StreamTouchInterval == 0 && d.SessionStore != nil {
// Zero from a real config = use the package default.
touchInterval = auth.SessionTouchInterval
}
sessionLifetime := d.SessionLifetime
if sessionLifetime <= 0 {
sessionLifetime = 30 * 24 * time.Hour
}
ticker := time.NewTicker(keepaliveInterval)
defer ticker.Stop()
ctx := r.Context()
for {
select {
case <-ctx.Done():
return
case now := <-ticker.C:
if d.SessionStore != nil && sessionInfo.IDHash != "" {
session, ok, err := d.SessionStore.LookupSession(sessionInfo.IDHash, now)
if err != nil || !ok {
return // session revoked or expired
}
if now.Sub(session.LastSeenAt) >= touchInterval {
if err := d.SessionStore.TouchSession(sessionInfo.IDHash, now, now.Add(sessionLifetime)); err != nil {
// non-fatal; same policy as SessionMiddleware
}
}
}
if _, err := fmt.Fprint(w, ":keepalive\n\n"); err != nil {
return
}
flusher.Flush()
case ev, ok := <-sub.Events():
if !ok {
return
}
if err := writeSSEEvent(w, ev.Type, ev.Payload); err != nil {
return
}
flusher.Flush()
}
}
}
Add "github.com/getorcha/wiki-browser/internal/auth" to the imports (for auth.SessionTouchInterval).
go test ./internal/server/ -run TestStream -v
Expected: all pass.
SessionStore + SessionLifetime from mainIn cmd/wiki-browser/main.go, in the server.Mux(server.Deps{ ... }) block, add:
SessionStore: collabStore,
SessionLifetime: sessionLifetime,
git add internal/server/handler_stream.go internal/server/handler_stream_test.go internal/server/handler_doc_test.go internal/server/server.go cmd/wiki-browser/main.go
git commit -m "wiki-browser: server — SSE keepalive touches session, closes on revocation"
POST /api/stream/focus — subscriber_id lookup, rate limiter, terminal-Topic coercionFiles:
internal/server/handler_stream.gointernal/server/handler_stream_test.gointernal/server/server.gointernal/realtime/hub.goTo support the focus endpoint, the hub needs a way to look up a subscriber by id, scoped to a session (so a hijacked id from another principal can't be used).
Append to internal/server/handler_stream_test.go:
func TestStreamFocus_setsFocusAndPublishesPresence(t *testing.T) {
hub := realtime.NewHub()
t.Cleanup(hub.Close)
ts, _, _ := newTestServerWithOptions(t, testServerOptions{
SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
Realtime: hub,
StreamKeepalive: 1 * time.Second,
})
// Open the stream and read the subscribed handshake.
ch, cancel := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
defer cancel()
ev := <-ch
var sub struct {
SubscriberID string `json:"subscriber_id"`
}
if err := json.Unmarshal([]byte(ev.Data), &sub); err != nil {
t.Fatal(err)
}
waitForEvent(t, ch, "presence.updated", 200*time.Millisecond)
// Create a Topic to focus on.
resp := postWithCSRF(t, ts.URL+"/api/topics", `{"source_path":"a.md","global":true,"first_message_body":"hello"}`)
defer resp.Body.Close()
var created struct{ ID string `json:"id"` }
if err := json.NewDecoder(resp.Body).Decode(&created); err != nil || created.ID == "" {
t.Fatalf("decode topic create: err=%v", err)
}
body := `{"subscriber_id":"` + sub.SubscriberID + `","topic_id":"` + created.ID + `"}`
focusResp := postWithCSRF(t, ts.URL+"/api/stream/focus", body)
defer focusResp.Body.Close()
if focusResp.StatusCode != http.StatusNoContent {
t.Fatalf("focus status = %d body=%s", focusResp.StatusCode, readAll(t, focusResp))
}
// The next presence.updated frame should show our focused_topic_id.
pev := waitForEvent(t, ch, "presence.updated", 200*time.Millisecond)
if !strings.Contains(pev.Data, `"focused_topic_id":"`+created.ID+`"`) {
t.Fatalf("presence missing focus: %s", pev.Data)
}
}
func TestStreamFocus_unknownSubscriberReturns404(t *testing.T) {
hub := realtime.NewHub()
t.Cleanup(hub.Close)
ts, _, _ := newTestServerWithOptions(t, testServerOptions{
SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
Realtime: hub,
})
resp := postWithCSRF(t, ts.URL+"/api/stream/focus", `{"subscriber_id":"bogus","topic_id":""}`)
defer resp.Body.Close()
if resp.StatusCode != http.StatusNotFound {
t.Fatalf("status = %d, want 404", resp.StatusCode)
}
}
func TestStreamFocus_terminalTopicCoercesToEmpty(t *testing.T) {
hub := realtime.NewHub()
t.Cleanup(hub.Close)
ts, _, _ := newTestServerWithOptions(t, testServerOptions{
SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
Realtime: hub,
StreamKeepalive: 1 * time.Second,
})
// Open + handshake.
ch, cancel := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
defer cancel()
ev := <-ch
var sub struct{ SubscriberID string `json:"subscriber_id"` }
if err := json.Unmarshal([]byte(ev.Data), &sub); err != nil || sub.SubscriberID == "" {
t.Fatalf("decode subscribed: err=%v data=%q", err, ev.Data)
}
waitForEvent(t, ch, "presence.updated", 200*time.Millisecond)
// Create + discard the Topic.
resp := postWithCSRF(t, ts.URL+"/api/topics", `{"source_path":"a.md","global":true,"first_message_body":"hello"}`)
var created struct{ ID string `json:"id"` }
if err := json.NewDecoder(resp.Body).Decode(&created); err != nil || created.ID == "" {
t.Fatalf("decode topic create: err=%v", err)
}
resp.Body.Close()
resp = postWithCSRF(t, ts.URL+"/api/topics/"+created.ID+"/discard", `{}`)
resp.Body.Close()
// Focus on the discarded Topic must succeed silently with 204; the
// resulting presence frame must NOT include the discarded topic_id.
body := `{"subscriber_id":"` + sub.SubscriberID + `","topic_id":"` + created.ID + `"}`
focusResp := postWithCSRF(t, ts.URL+"/api/stream/focus", body)
defer focusResp.Body.Close()
if focusResp.StatusCode != http.StatusNoContent {
t.Fatalf("focus status = %d body=%s", focusResp.StatusCode, readAll(t, focusResp))
}
}
func TestStreamFocus_rateLimiterRejectsBurst(t *testing.T) {
hub := realtime.NewHub()
t.Cleanup(hub.Close)
ts, _, _ := newTestServerWithOptions(t, testServerOptions{
SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
Realtime: hub,
StreamKeepalive: 1 * time.Second,
})
ch, cancel := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
defer cancel()
ev := <-ch
var sub struct{ SubscriberID string `json:"subscriber_id"` }
if err := json.Unmarshal([]byte(ev.Data), &sub); err != nil || sub.SubscriberID == "" {
t.Fatalf("decode subscribed: err=%v data=%q", err, ev.Data)
}
waitForEvent(t, ch, "presence.updated", 200*time.Millisecond)
body := `{"subscriber_id":"` + sub.SubscriberID + `","topic_id":""}`
var lastStatus int
for i := 0; i < 5; i++ {
resp := postWithCSRF(t, ts.URL+"/api/stream/focus", body)
lastStatus = resp.StatusCode
resp.Body.Close()
}
if lastStatus != http.StatusTooManyRequests {
t.Fatalf("after burst, last status = %d, want 429", lastStatus)
}
}
LookupSubscriber to the hubIn internal/realtime/hub.go:
// LookupSubscriber returns the subscription matching id, scoped to the calling
// session. Returns nil if not found OR if the subscription's session does not
// match — the latter defends against a hijacked id from another principal.
func (h *Hub) LookupSubscriber(id, sessionID string) *Subscriber {
h.mu.RLock()
defer h.mu.RUnlock()
for _, set := range h.sources {
for s := range set {
if s.id == id && s.princ.SessionID == sessionID {
return s
}
}
}
return nil
}
Why this isn't keyed by source_path: the SSE handler binds (subscriber_id, source_path, session) at Subscribe time, so the source can be inferred from the subscriber rather than supplied by the client. Reduces request-shape surface.
Add to internal/server/handler_stream.go:
import (
// existing imports …
"database/sql"
"errors"
"sync"
)
type streamFocusRequest struct {
SubscriberID string `json:"subscriber_id"`
TopicID string `json:"topic_id,omitempty"`
}
const focusLimitInterval = time.Second
// focusLimiter is the 1-call/sec token bucket keyed by (session, subscriber).
// Instance state — lives inside Deps (constructed in Mux) so each httptest.
// Server in the test suite gets its own bucket map. Package-level state would
// cross-contaminate the test binary, which runs all internal/server tests in
// one process.
type focusLimiter struct {
mu sync.Mutex
last map[string]time.Time
}
func newFocusLimiter() *focusLimiter {
return &focusLimiter{last: map[string]time.Time{}}
}
func (f *focusLimiter) allow(key string, now time.Time) bool {
f.mu.Lock()
defer f.mu.Unlock()
prev, ok := f.last[key]
if ok && now.Sub(prev) < focusLimitInterval {
return false
}
f.last[key] = now
return true
}
func (d Deps) handleStreamFocus(w http.ResponseWriter, r *http.Request) {
if d.Realtime == nil || d.Collab == nil {
writeJSONError(w, http.StatusServiceUnavailable, "collab_unavailable")
return
}
var req streamFocusRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeJSONError(w, http.StatusBadRequest, "bad_json")
return
}
if req.SubscriberID == "" {
writeJSONError(w, http.StatusBadRequest, "bad_request")
return
}
sessionInfo, _ := auth.SessionFrom(r.Context())
sub := d.Realtime.LookupSubscriber(req.SubscriberID, sessionInfo.IDHash)
if sub == nil {
writeJSONError(w, http.StatusNotFound, "unknown_subscriber")
return
}
limitKey := sessionInfo.IDHash + ":" + sub.ID()
if d.FocusLimiter == nil {
writeJSONError(w, http.StatusInternalServerError, "limiter_misconfigured")
return
}
if !d.FocusLimiter.allow(limitKey, time.Now()) {
writeJSONError(w, http.StatusTooManyRequests, "too_many_focus_calls")
return
}
if req.TopicID != "" {
state, err := d.Collab.GetTopicState(req.TopicID)
switch {
case errors.Is(err, sql.ErrNoRows):
writeJSONError(w, http.StatusNotFound, "unknown_topic")
return
case err != nil:
writeJSONError(w, http.StatusInternalServerError, "topic_lookup_failed")
return
case state.SourcePath != sub.Source():
writeJSONError(w, http.StatusNotFound, "unknown_topic")
return
case !state.Open:
// Terminal Topic — coerce to empty rather than 409. The matching
// topic.discarded / topic.incorporated event is in flight; the
// chrome will repaint anyway.
req.TopicID = ""
}
}
sub.SetFocus(req.TopicID)
w.WriteHeader(http.StatusNoContent)
}
server.go and own the limiter inside MuxAdd a field to Deps:
// FocusLimiter throttles POST /api/stream/focus per (session, subscriber).
// Mux populates a default; tests can override.
FocusLimiter *focusLimiter
Inside func Mux(d Deps) *http.ServeMux, before the routes:
if d.FocusLimiter == nil {
d.FocusLimiter = newFocusLimiter()
}
Then the route itself:
mux.Handle("POST /api/stream/focus",
d.withSession(auth.RequireCollaborator(auth.RequireCSRF(http.HandlerFunc(d.handleStreamFocus)))))
go test ./internal/server/ -run 'TestStreamFocus' -v
Expected: all pass.
git add internal/realtime/hub.go internal/server/handler_stream.go internal/server/server.go internal/server/handler_stream_test.go
git commit -m "wiki-browser: server — POST /api/stream/focus"
handleCreateTopic publishes topic.createdFiles:
Modify: internal/server/topics.go
Modify: internal/server/topics_test.go
Step 1: Write the failing test
Append to internal/server/topics_test.go:
import (
// existing imports …
"time"
"github.com/getorcha/wiki-browser/internal/realtime"
)
func TestCreateTopic_publishesTopicCreated(t *testing.T) {
hub := realtime.NewHub()
t.Cleanup(hub.Close)
ts, _, _ := newTestServerWithOptions(t, testServerOptions{
SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
Realtime: hub,
StreamKeepalive: 1 * time.Second,
})
ch, cancel := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
defer cancel()
<-ch // subscribed
waitForEvent(t, ch, "presence.updated", 200*time.Millisecond)
resp := postWithCSRF(t, ts.URL+"/api/topics",
`{"source_path":"a.md","global":true,"first_message_body":"hello world"}`)
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
t.Fatalf("create status = %d", resp.StatusCode)
}
ev := waitForEvent(t, ch, "topic.created", 200*time.Millisecond)
if !strings.Contains(ev.Data, `"first_message_preview":"hello world"`) {
t.Fatalf("event missing preview: %s", ev.Data)
}
if !strings.Contains(ev.Data, `"source_path":"a.md"`) {
t.Fatalf("event missing source_path: %s", ev.Data)
}
if !strings.Contains(ev.Data, `"anchor_kind":"global"`) {
t.Fatalf("event missing anchor_kind: %s", ev.Data)
}
if !strings.Contains(ev.Data, `"created_by":"daniel@getorcha.com"`) {
t.Fatalf("event missing created_by: %s", ev.Data)
}
}
go test ./internal/server/ -run TestCreateTopic_publishesTopicCreated -v
Expected: timeout waiting for topic.created (handler doesn't publish).
handleCreateTopicIn internal/server/topics.go, after the successful InsertTopicWithFirstMessage:
import (
// existing imports …
"strings"
"time"
"github.com/getorcha/wiki-browser/internal/realtime"
)
func topicCreatedPayload(topicID, sourcePath, anchorKind, firstMessageBody, createdBy string, now time.Time) map[string]any {
return map[string]any{
"topic_id": topicID,
"source_path": sourcePath,
"anchor_kind": anchorKind,
"first_message_preview": preview(firstMessageBody, 160),
"created_by": createdBy,
"created_at": now.Unix(),
}
}
// preview truncates s to at most max bytes, appending an ellipsis if cut.
// Uses rune-aware truncation so multibyte content doesn't split mid-codepoint.
func preview(s string, max int) string {
if len(s) <= max {
return s
}
runes := []rune(s)
if len(runes) <= max {
return s
}
if max < 1 {
return ""
}
return string(runes[:max-1]) + "…"
}
Inside handleCreateTopic, just before the final writeJSON(w, http.StatusCreated, …):
if d.Realtime != nil {
anchorKind := "anchored"
if req.Global {
anchorKind = "global"
}
d.Realtime.Publish(req.SourcePath, realtime.Event{
Type: "topic.created",
Payload: topicCreatedPayload(topicID, req.SourcePath, anchorKind, req.FirstMessageBody, principal.UserID, time.Now()),
})
}
go test ./internal/server/ -run TestCreateTopic -v
Expected: all pass.
git add internal/server/topics.go internal/server/topics_test.go
git commit -m "wiki-browser: server — handleCreateTopic publishes topic.created"
handleAppendTopicMessage publishes topic.message_appendedFiles:
internal/server/topics.gointernal/server/topics_test.goPayload carries proposal_id? (always null for human-authored messages; non-null only for the agent-proposal case which Task 14 covers). Sequence comes back from InsertMessage indirectly — we need to read it via a follow-up SELECT. Since InsertMessage doesn't return the sequence today, add a helper that reads the row by (topic_id, id) after the insert.
Actually, since the sequence is consulted only for the SSE event payload (REST consumers fetch via /api/topics/{id}/messages), and the SSE consumer is going to refetch anyway, we have a choice: omit sequence from the payload, or extend InsertMessage's signature.
Simpler: extend InsertMessage to return (id, sequence, error). The spec event surface requires it.
Append to internal/server/topics_test.go:
func TestAppendMessage_publishesTopicMessageAppended(t *testing.T) {
hub := realtime.NewHub()
t.Cleanup(hub.Close)
ts, _, _ := newTestServerWithOptions(t, testServerOptions{
SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
Realtime: hub,
StreamKeepalive: 1 * time.Second,
})
// Create a topic to attach the message to.
resp := postWithCSRF(t, ts.URL+"/api/topics",
`{"source_path":"a.md","global":true,"first_message_body":"first"}`)
defer resp.Body.Close()
var created struct{ ID string `json:"id"` }
if err := json.NewDecoder(resp.Body).Decode(&created); err != nil || created.ID == "" {
t.Fatalf("decode topic create: err=%v", err)
}
ch, cancel := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
defer cancel()
<-ch // subscribed
waitForEvent(t, ch, "presence.updated", 200*time.Millisecond)
resp = postWithCSRF(t, ts.URL+"/api/topics/"+created.ID+"/messages", `{"body":"second"}`)
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
t.Fatalf("append status = %d", resp.StatusCode)
}
ev := waitForEvent(t, ch, "topic.message_appended", 200*time.Millisecond)
if !strings.Contains(ev.Data, `"topic_id":"`+created.ID+`"`) {
t.Fatalf("missing topic_id: %s", ev.Data)
}
if !strings.Contains(ev.Data, `"kind":"human"`) {
t.Fatalf("missing kind: %s", ev.Data)
}
if !strings.Contains(ev.Data, `"body_preview":"second"`) {
t.Fatalf("missing body_preview: %s", ev.Data)
}
if !strings.Contains(ev.Data, `"author_user_id":"daniel@getorcha.com"`) {
t.Fatalf("missing author_user_id: %s", ev.Data)
}
if !strings.Contains(ev.Data, `"sequence":2`) {
t.Fatalf("missing/wrong sequence: %s", ev.Data)
}
if strings.Contains(ev.Data, `"proposal_id"`) {
t.Fatalf("proposal_id should be omitted for human messages: %s", ev.Data)
}
}
InsertMessage to return the allocated sequenceIn internal/collab/mutators.go, change InsertMessage's signature:
// InsertMessage allocates the next per-topic sequence and inserts the row.
// Returns (id, sequence, error). Callers that need the sequence for an event
// payload read it from the return value; older callers can ignore it.
func (s *Store) InsertMessage(m NewMessage) (string, int, error) {
if err := validateMessage(m); err != nil {
return "", 0, err
}
var next int
err := s.send(func(db *sql.DB) error {
err := db.QueryRow(
`SELECT COALESCE(MAX(sequence), 0) + 1
FROM topic_messages WHERE topic_id = ?`,
m.TopicID,
).Scan(&next)
if err != nil {
return fmt.Errorf("allocate sequence: %w", err)
}
_, err = db.Exec(
`INSERT INTO topic_messages(
id, topic_id, kind, body, author_user_id, proposal_id,
sequence, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, unixepoch())`,
m.ID, m.TopicID, m.Kind, m.Body, m.AuthorUserID, m.ProposalID, next,
)
return err
})
if err != nil {
return "", 0, err
}
return m.ID, next, nil
}
Update every caller of InsertMessage. Use:
grep -rn '\.InsertMessage(' internal cmd
The callers are limited (chiefly in internal/server/topics.go and cmd/wb-agent); fix them all to the three-value signature, discarding _ where the sequence isn't needed.
handleAppendTopicMessageIn internal/server/topics.go, replace the IsTopicOpen switch with a single GetTopicState call (caches source_path for the publish), then replace the InsertMessage call:
state, err := d.Collab.GetTopicState(topicID)
switch {
case errors.Is(err, sql.ErrNoRows):
writeJSONError(w, http.StatusNotFound, "unknown_topic")
return
case err != nil:
writeJSONError(w, http.StatusInternalServerError, "lookup_failed")
return
case !state.Open:
writeJSONError(w, http.StatusGone, "topic_closed")
return
}
var req messageRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeJSONError(w, http.StatusBadRequest, "bad_json")
return
}
if err := validateBody(req.Body); err != nil {
writeJSONError(w, http.StatusBadRequest, "body_required")
return
}
id := uuid.NewString()
principal, _ := auth.PrincipalFrom(r.Context())
author := principal.UserID
_, seq, err := d.Collab.InsertMessage(collab.NewMessage{
ID: id, TopicID: topicID, Kind: "human", Body: req.Body, AuthorUserID: &author,
})
if err != nil {
writeJSONError(w, http.StatusInternalServerError, "insert_message_failed")
return
}
if d.Realtime != nil {
d.Realtime.Publish(state.SourcePath, realtime.Event{
Type: "topic.message_appended",
Payload: map[string]any{
"topic_id": topicID,
"message_id": id,
"sequence": seq,
"kind": "human",
"body_preview": preview(req.Body, 160),
"author_user_id": author,
"created_at": time.Now().Unix(),
},
})
}
writeJSON(w, http.StatusCreated, map[string]string{"id": id})
go test ./internal/server/ -run TestAppendMessage -v
go test ./internal/server/ -run TestTopicsAPI -v
go test ./internal/collab/ ./internal/agent/ -v
Expected: all pass. The signature change to InsertMessage is the most likely break point — fix any remaining callers.
InsertMessage's signature change touches every caller in the tree. cmd/wb-agent/main.go writes via raw SQL today, so only the test file there changes; cmd/wb-agent/ is added wholesale because the same signature change ripples through cmd/wb-agent/main_test.go. Production binary code in cmd/wb-agent is unchanged.
git add \
internal/collab/mutators.go \
internal/collab/mutators_test.go \
internal/collab/reader_test.go \
internal/server/topics.go \
internal/server/topics_test.go \
internal/server/e2e_resolve_test.go \
internal/agent/service_test.go \
internal/agent/e2e_test.go \
cmd/wb-agent/main_test.go
git commit -m "wiki-browser: server — handleAppendTopicMessage publishes topic.message_appended"
(Run grep -rn '\.InsertMessage(' internal cmd after the signature change to confirm every caller has been updated to the three-value return.)
handleDiscardTopic publishes topic.discardedFiles:
Modify: internal/server/handler_proposals.go
Modify: internal/server/handler_stream_test.go (the stream-related fixtures live here, so colocate the publish-side regression test)
Step 1: Write the failing test
Append to internal/server/handler_stream_test.go:
func TestDiscardTopic_publishesTopicDiscarded(t *testing.T) {
hub := realtime.NewHub()
t.Cleanup(hub.Close)
ts, _, _ := newTestServerWithOptions(t, testServerOptions{
SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
Realtime: hub,
StreamKeepalive: 1 * time.Second,
})
// Create a topic.
resp := postWithCSRF(t, ts.URL+"/api/topics",
`{"source_path":"a.md","global":true,"first_message_body":"hello"}`)
var created struct{ ID string `json:"id"` }
if err := json.NewDecoder(resp.Body).Decode(&created); err != nil || created.ID == "" {
t.Fatalf("decode topic create: err=%v", err)
}
resp.Body.Close()
ch, cancel := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
defer cancel()
<-ch
waitForEvent(t, ch, "presence.updated", 200*time.Millisecond)
resp = postWithCSRF(t, ts.URL+"/api/topics/"+created.ID+"/discard", `{"reason":"never mind"}`)
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Fatalf("discard status = %d", resp.StatusCode)
}
ev := waitForEvent(t, ch, "topic.discarded", 200*time.Millisecond)
if !strings.Contains(ev.Data, `"topic_id":"`+created.ID+`"`) {
t.Fatalf("missing topic_id: %s", ev.Data)
}
if !strings.Contains(ev.Data, `"discarded_by":"daniel@getorcha.com"`) {
t.Fatalf("missing discarded_by: %s", ev.Data)
}
}
go test ./internal/server/ -run TestDiscardTopic_publishesTopicDiscarded -v
In internal/server/handler_proposals.go, after the successful DiscardTopic call:
import (
// existing imports …
"github.com/getorcha/wiki-browser/internal/realtime"
)
// inside handleDiscardTopic, after writeJSON(...) but before the return — or
// reshape so the response is written last:
if d.Realtime != nil {
d.Realtime.Publish(state.SourcePath, realtime.Event{
Type: "topic.discarded",
Payload: map[string]any{
"topic_id": id,
"discarded_by": principal.UserID,
"discarded_at": discardedAt,
},
})
}
go test ./internal/server/ -v
Expected: pass.
git add internal/server/handler_proposals.go internal/server/handler_stream_test.go
git commit -m "wiki-browser: server — handleDiscardTopic publishes topic.discarded"
handleIncorporate publishes topic.incorporatedFiles:
internal/server/handler_proposals.gointernal/server/e2e_resolve_test.goThe existing newFullStackFake fixture in e2e_resolve_test.go already stands up a real collab.Store + real agent.Service + FakeRunner + server.Mux. We extend it once to accept a hub, then add a new test that asserts the published event sequence end-to-end. Because the same hub also drives Task 14's agent-runner publishes, this test ends up covering both the approval-side topic.incorporated AND the runner-side topic.message_appended / proposal.created / job.updated once Task 14 is implemented.
newFullStackFake to accept a hubIn internal/server/e2e_resolve_test.go, modify the function signature and the two Deps it builds:
import (
// existing imports …
"github.com/getorcha/wiki-browser/internal/realtime"
)
func newFullStackFake(t *testing.T, proposalSource, explanation string) *fullStackEnv {
return newFullStackFakeWithHub(t, proposalSource, explanation, nil)
}
func newFullStackFakeWithHub(t *testing.T, proposalSource, explanation string, hub *realtime.Hub) *fullStackEnv {
// … existing body verbatim …
service := agent.NewService(agent.ServiceConfig{
// … existing fields …
Realtime: hub, // new
})
t.Cleanup(service.Stop)
// … existing walker/index/cache …
mux := server.Mux(server.Deps{
// … existing fields …
Realtime: hub, // new
})
// … rest of the body unchanged …
}
The existing newFullStackFake callers continue to work (hub is nil for them, publishing is silently disabled).
Append to internal/server/e2e_resolve_test.go:
func TestE2EResolveFlow_publishesTopicIncorporated(t *testing.T) {
hub := realtime.NewHub()
t.Cleanup(hub.Close)
env := newFullStackFakeWithHub(t, "alpha\nrewritten line\n", "explanation body", hub)
ch, cancel := openSSE(t, env.ts.URL+"/api/stream?source_path=docs/x.md")
defer cancel()
<-ch // subscribed
waitForEvent(t, ch, "presence.updated", 200*time.Millisecond)
topicID := env.createTopic(t, "please rewrite")
jobID := env.proposeRewrite(t, topicID)
env.waitForJob(t, jobID)
// Runner emits topic.message_appended (agent-proposal) + proposal.created
// + job.updated{succeeded} once Task 14 is implemented. Skip ahead to the
// approval event so this test passes after Task 13 alone, then strengthen
// after Task 14 lands.
proposals := env.listProposals(t, topicID)
if len(proposals) != 1 {
t.Fatalf("proposals = %d", len(proposals))
}
commitSHA := env.approve(t, proposals[0].ID)
ev := waitForEvent(t, ch, "topic.incorporated", 500*time.Millisecond)
if !strings.Contains(ev.Data, `"topic_id":"`+topicID+`"`) {
t.Fatalf("missing topic_id: %s", ev.Data)
}
if !strings.Contains(ev.Data, `"source_path":"docs/x.md"`) {
t.Fatalf("missing source_path: %s", ev.Data)
}
if !strings.Contains(ev.Data, `"commit_sha":"`+commitSHA+`"`) {
t.Fatalf("missing commit_sha: %s", ev.Data)
}
if !strings.Contains(ev.Data, `"incorporated_by":"daniel@getorcha.com"`) {
t.Fatalf("missing incorporated_by: %s", ev.Data)
}
}
The proposalSummary struct returned by env.listProposals and the env.approve signature already exist in e2e_resolve_test.go — read the helper definitions there if the field names need confirming.
go test ./internal/server/ -run TestE2EResolveFlow_publishesTopicIncorporated -v
Expected: the test times out waiting for topic.incorporated because handleIncorporate doesn't publish yet.
In internal/server/handler_proposals.go, add the import:
import (
// existing imports …
"time"
"github.com/getorcha/wiki-browser/internal/realtime"
)
After the successful collab.Incorporate(...) call (right before the final writeJSON(...)):
if d.Realtime != nil {
d.Realtime.Publish(state.SourcePath, realtime.Event{
Type: "topic.incorporated",
Payload: map[string]any{
"topic_id": state.ID,
"source_path": state.SourcePath,
"commit_sha": commitSHA,
"incorporated_by": principal.UserID,
"incorporated_at": time.Now().Unix(),
},
})
}
go test ./internal/server/ -v
Expected: all pass, including the new test.
git add internal/server/handler_proposals.go internal/server/e2e_resolve_test.go
git commit -m "wiki-browser: server — handleIncorporate publishes topic.incorporated"
This is the central piece for #4 ↔ #6 integration. After CompleteJob succeeds:
topic.message_appended (agent-proposal row) → proposal.created → job.updated{status=succeeded}. The first two come from fresh DB reads (the runner already has the proposal via GetProposalByJobID; the agent-proposal message via GetAgentProposalMessage).job.updated.job.updated — perspective.refreshed is reserved for #5 to fill.Files:
Modify: internal/agent/service.go
Modify: internal/agent/service_test.go or internal/agent/e2e_test.go
Step 1: Write the failing test
Append to internal/agent/e2e_test.go. This test reuses the existing TestEndToEnd_IncorporateProducesProposal scaffolding pattern: a real collab.Store, a FakeRunner that inserts the proposal + agent-proposal message, and a stub hub that records every Publish call so we can assert the order.
import (
// existing imports …
"sync"
"github.com/getorcha/wiki-browser/internal/realtime"
)
// recordingHub captures Publish calls for assertion. Satisfies
// agent.HubPublisher.
type recordingHub struct {
mu sync.Mutex
events []recordedEvent
}
type recordedEvent struct {
Source string
Type string
Status string // job.updated payload's status (when applicable)
}
func (r *recordingHub) Publish(source string, evt realtime.Event) {
r.mu.Lock()
defer r.mu.Unlock()
rec := recordedEvent{Source: source, Type: evt.Type}
if evt.Type == "job.updated" {
if m, ok := evt.Payload.(map[string]any); ok {
if s, _ := m["status"].(string); s != "" {
rec.Status = s
}
}
}
r.events = append(r.events, rec)
}
func (r *recordingHub) snapshot() []recordedEvent {
r.mu.Lock()
defer r.mu.Unlock()
out := make([]recordedEvent, len(r.events))
copy(out, r.events)
return out
}
func TestEndToEnd_IncorporatePublishesPinnedEventOrderOnSuccess(t *testing.T) {
store, err := collab.Open(collab.Config{Path: filepath.Join(t.TempDir(), "collab.db")})
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { _ = store.Close() })
if _, err := store.RawDBForTest().Exec(
`INSERT INTO users(id, display_name, created_at) VALUES ('u1','U1', unixepoch())`,
); err != nil {
t.Fatal(err)
}
if err := store.InsertTopicWithFirstMessage(collab.NewTopicWithFirstMessage{
TopicID: "t1", SourcePath: "docs/foo.md",
Anchor: []byte(`{"kind":"global"}`),
CreatedBy: "u1", FirstMessageID: "m1", FirstMessageBody: "rewrite foo",
}); err != nil {
t.Fatal(err)
}
runner := agent.NewFakeRunner(func(ctx context.Context, j agent.Job) agent.RunResult {
const fakeSHA = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
propID := "p1"
if _, err := store.InsertProposal(collab.NewProposal{
ID: propID, TopicID: "t1", RevisionNumber: 1,
ProposedSource: "fake new source", BaseSourceSHA: fakeSHA,
AgentJobID: &j.ID,
}); err != nil {
return agent.RunResult{ExitCode: 1, ErrorTail: err.Error()}
}
if _, _, err := store.InsertMessage(collab.NewMessage{
ID: "msg-p1", TopicID: "t1", Kind: "agent-proposal",
Body: "proposal explanation", ProposalID: &propID,
}); err != nil {
return agent.RunResult{ExitCode: 1, ErrorTail: err.Error()}
}
return agent.RunResult{ExitCode: 0}
})
hub := &recordingHub{}
svc := agent.NewService(agent.ServiceConfig{
Store: store, Runner: runner, MaxConcurrentJobs: 1,
IncorporateTimeout: 2 * time.Second, PerspectiveTimeout: 2 * time.Second,
RepoRoot: t.TempDir(), WikiRoot: t.TempDir(), WBAgentPath: "/wb-agent",
Realtime: hub,
})
t.Cleanup(svc.Stop)
jobID, err := svc.Submit(agent.SubmitInput{
Kind: "incorporate", SourcePath: "docs/foo.md", TopicID: "t1",
})
if err != nil {
t.Fatal(err)
}
// publishJobOutcome runs AFTER CompleteJob writes status=succeeded, so
// polling for status alone races with the three publishes. Instead, wait
// for the third event to arrive (or the deadline to expire). Status check
// is done after, as a sanity guard.
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if len(hub.snapshot()) == 3 {
break
}
time.Sleep(20 * time.Millisecond)
}
if j, err := store.GetJob(jobID); err != nil || j.Status != "succeeded" {
t.Fatalf("job status = %v, err = %v", j.Status, err)
}
got := hub.snapshot()
if len(got) != 3 {
t.Fatalf("event count = %d, want 3 (got %+v)", len(got), got)
}
want := []string{"topic.message_appended", "proposal.created", "job.updated"}
for i, w := range want {
if got[i].Type != w {
t.Fatalf("event[%d] = %q, want %q (all: %+v)", i, got[i].Type, w, got)
}
if got[i].Source != "docs/foo.md" {
t.Fatalf("event[%d].Source = %q, want docs/foo.md", i, got[i].Source)
}
}
if got[2].Status != "succeeded" {
t.Fatalf("job.updated.status = %q, want succeeded", got[2].Status)
}
}
func TestEndToEnd_IncorporatePublishesOnlyJobUpdatedOnFailure(t *testing.T) {
store, err := collab.Open(collab.Config{Path: filepath.Join(t.TempDir(), "collab.db")})
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { _ = store.Close() })
if _, err := store.RawDBForTest().Exec(
`INSERT INTO users(id, display_name, created_at) VALUES ('u1','U1', unixepoch())`,
); err != nil {
t.Fatal(err)
}
if err := store.InsertTopicWithFirstMessage(collab.NewTopicWithFirstMessage{
TopicID: "t1", SourcePath: "docs/foo.md",
Anchor: []byte(`{"kind":"global"}`),
CreatedBy: "u1", FirstMessageID: "m1", FirstMessageBody: "rewrite foo",
}); err != nil {
t.Fatal(err)
}
// Runner that exits non-zero — classify() sets status=failed; the
// post-invariant check never runs because the exit code already trips.
runner := agent.NewFakeRunner(func(ctx context.Context, j agent.Job) agent.RunResult {
return agent.RunResult{ExitCode: 2, ErrorTail: "boom"}
})
hub := &recordingHub{}
svc := agent.NewService(agent.ServiceConfig{
Store: store, Runner: runner, MaxConcurrentJobs: 1,
IncorporateTimeout: 2 * time.Second, PerspectiveTimeout: 2 * time.Second,
RepoRoot: t.TempDir(), WikiRoot: t.TempDir(), WBAgentPath: "/wb-agent",
Realtime: hub,
})
t.Cleanup(svc.Stop)
jobID, err := svc.Submit(agent.SubmitInput{
Kind: "incorporate", SourcePath: "docs/foo.md", TopicID: "t1",
})
if err != nil {
t.Fatal(err)
}
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if len(hub.snapshot()) == 1 {
break
}
time.Sleep(20 * time.Millisecond)
}
if j, err := store.GetJob(jobID); err != nil || j.Status != "failed" {
t.Fatalf("job status = %v, err = %v", j.Status, err)
}
got := hub.snapshot()
if len(got) != 1 || got[0].Type != "job.updated" || got[0].Status != "failed" {
t.Fatalf("failure path published: %+v", got)
}
}
internal/agentTo avoid service_test.go importing the concrete *realtime.Hub, declare a narrow interface in internal/agent/service.go:
// HubPublisher is the narrow surface the runner uses from realtime.Hub.
// *realtime.Hub satisfies this; tests may substitute a recording fake.
type HubPublisher interface {
Publish(sourcePath string, evt realtime.Event)
}
Change ServiceConfig.Realtime to Realtime HubPublisher.
In cmd/wiki-browser/main.go, the existing assignment still works because *realtime.Hub satisfies the interface.
(*Service).runAfter the CompleteJob call:
// After s.cfg.Store.CompleteJob(...) returns successfully, publish events.
// Order matters for incorporate jobs (see decisions log: "Publish-after-
// commit, with pinned ordering for Agent incorporate jobs").
if s.cfg.Realtime != nil {
s.publishJobOutcome(in, jobID, status)
}
The incorporate artifact events are all-or-nothing. If either post-commit read fails unexpectedly, skip topic.message_appended and proposal.created; still publish the authoritative job.updated fallback. This preserves the pinned ordering contract and avoids shipping a partial artifact sequence.
Add this method:
func (s *Service) publishJobOutcome(in SubmitInput, jobID, status string) {
if in.Kind == "incorporate" && status == "succeeded" {
// Both reads MUST succeed for a succeeded incorporate job — the
// post-invariant check in classify() already ran them. A transient DB
// error here would silently drop topic.message_appended +
// proposal.created while job.updated{succeeded} still shipped, exactly
// the flicker the pinned-order decision was designed to prevent.
// Log loudly; the order-pin contract is load-bearing for the spec.
prop, err := s.cfg.Store.GetProposalByJobID(jobID)
if err != nil {
slog.Warn("agent: publish-after-commit: GetProposalByJobID failed",
"job_id", jobID, "err", err)
} else if msg, err := s.cfg.Store.GetAgentProposalMessage(prop.ID); err != nil {
slog.Warn("agent: publish-after-commit: GetAgentProposalMessage failed",
"job_id", jobID, "proposal_id", prop.ID, "err", err)
} else {
s.cfg.Realtime.Publish(in.SourcePath, realtime.Event{
Type: "topic.message_appended",
Payload: map[string]any{
"topic_id": in.TopicID,
"message_id": msg.ID,
"sequence": msg.Sequence,
"kind": "agent-proposal",
"body_preview": preview(msg.Body, 160),
"proposal_id": prop.ID,
"created_at": msg.CreatedAt, // already unix seconds
},
})
s.cfg.Realtime.Publish(in.SourcePath, realtime.Event{
Type: "proposal.created",
Payload: map[string]any{
"proposal_id": prop.ID,
"topic_id": prop.TopicID,
"source_path": in.SourcePath,
"revision_number": prop.RevisionNumber,
"agent_job_id": jobID,
},
})
}
}
payload := map[string]any{
"job_id": jobID,
"kind": in.Kind,
"status": status,
}
if in.TopicID != "" {
payload["topic_id"] = in.TopicID
}
if in.PersonaName != "" {
payload["persona_name"] = in.PersonaName
}
s.cfg.Realtime.Publish(in.SourcePath, realtime.Event{
Type: "job.updated",
Payload: payload,
})
}
// preview lives in this package so the agent can share it with the server's
// payload builders without an import. Mirrors internal/server.preview.
func preview(s string, max int) string {
if len(s) <= max {
return s
}
runes := []rune(s)
if len(runes) <= max {
return s
}
if max < 1 {
return ""
}
return string(runes[:max-1]) + "…"
}
NOTE. The errors from
GetProposalByJobIDandGetAgentProposalMessageare deliberately swallowed only for the artifact events: if classification passed but the rows can't be read, the post-invariant check should have caught it first and turned the status tofailed. For a defensive fallback, log the failed read, skip the incomplete artifact sequence, and still publishjob.updatedso clients converge through REST refresh.
Realtime into the runner setup in cmd/wiki-browser/main.goIn the agent.NewService(...) call, add:
Realtime: hub,
go test ./internal/agent/ -v
go test ./internal/server/ -v
Expected: all pass. The agent's existing e2e_test.go should continue to pass; with the recordingHub wired in, the order assertions become testable in a follow-up.
git add internal/agent/service.go internal/agent/service_test.go cmd/wiki-browser/main.go
git commit -m "wiki-browser: agent — publish pinned event sequence on incorporate completion"
This is the integration test that proves the wiring works for the full event surface. Two SSE clients open against the same source; each mutation triggers the expected events on both.
Files:
Create: internal/server/e2e_realtime_test.go
Step 1: Write the test
package server_test
import (
"encoding/json"
"net/http"
"strings"
"testing"
"time"
"github.com/getorcha/wiki-browser/internal/realtime"
)
func TestE2E_twoClients_topicCreated_bothReceiveTheEvent(t *testing.T) {
hub := realtime.NewHub()
t.Cleanup(hub.Close)
ts, _, _ := newTestServerWithOptions(t, testServerOptions{
SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
Realtime: hub,
StreamKeepalive: 1 * time.Second,
})
a, cancelA := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
defer cancelA()
b, cancelB := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
defer cancelB()
<-a
<-b
waitForEvent(t, a, "presence.updated", 200*time.Millisecond)
waitForEvent(t, b, "presence.updated", 200*time.Millisecond)
resp := postWithCSRF(t, ts.URL+"/api/topics",
`{"source_path":"a.md","global":true,"first_message_body":"shared topic"}`)
defer resp.Body.Close()
if resp.StatusCode != http.StatusCreated {
t.Fatalf("create status = %d", resp.StatusCode)
}
ea := waitForEvent(t, a, "topic.created", 200*time.Millisecond)
eb := waitForEvent(t, b, "topic.created", 200*time.Millisecond)
for _, ev := range []sseEvent{ea, eb} {
if !strings.Contains(ev.Data, `"first_message_preview":"shared topic"`) {
t.Fatalf("event missing preview: %s", ev.Data)
}
}
}
func TestE2E_twoClients_messageAppend_bothReceive(t *testing.T) {
hub := realtime.NewHub()
t.Cleanup(hub.Close)
ts, _, _ := newTestServerWithOptions(t, testServerOptions{
SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
Realtime: hub,
StreamKeepalive: 1 * time.Second,
})
resp := postWithCSRF(t, ts.URL+"/api/topics",
`{"source_path":"a.md","global":true,"first_message_body":"first"}`)
var created struct{ ID string `json:"id"` }
if err := json.NewDecoder(resp.Body).Decode(&created); err != nil || created.ID == "" {
t.Fatalf("decode topic create: err=%v", err)
}
resp.Body.Close()
a, cancelA := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
defer cancelA()
b, cancelB := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
defer cancelB()
<-a
<-b
waitForEvent(t, a, "presence.updated", 200*time.Millisecond)
waitForEvent(t, b, "presence.updated", 200*time.Millisecond)
resp = postWithCSRF(t, ts.URL+"/api/topics/"+created.ID+"/messages", `{"body":"second"}`)
resp.Body.Close()
waitForEvent(t, a, "topic.message_appended", 200*time.Millisecond)
waitForEvent(t, b, "topic.message_appended", 200*time.Millisecond)
}
func TestE2E_presence_focusReflectedInOtherClient(t *testing.T) {
hub := realtime.NewHub()
t.Cleanup(hub.Close)
ts, _, _ := newTestServerWithOptions(t, testServerOptions{
SessionMiddleware: testSessionMiddleware("daniel@getorcha.com", "Daniel", "csrf"),
Realtime: hub,
StreamKeepalive: 1 * time.Second,
})
// Create a topic to focus on.
resp := postWithCSRF(t, ts.URL+"/api/topics",
`{"source_path":"a.md","global":true,"first_message_body":"focus me"}`)
var created struct{ ID string `json:"id"` }
if err := json.NewDecoder(resp.Body).Decode(&created); err != nil || created.ID == "" {
t.Fatalf("decode topic create: err=%v", err)
}
resp.Body.Close()
a, cancelA := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
defer cancelA()
b, cancelB := openSSE(t, ts.URL+"/api/stream?source_path=a.md")
defer cancelB()
evA := <-a
var subA struct{ SubscriberID string `json:"subscriber_id"` }
json.Unmarshal([]byte(evA.Data), &subA)
<-b
waitForEvent(t, a, "presence.updated", 200*time.Millisecond)
waitForEvent(t, b, "presence.updated", 200*time.Millisecond)
resp = postWithCSRF(t, ts.URL+"/api/stream/focus",
`{"subscriber_id":"`+subA.SubscriberID+`","topic_id":"`+created.ID+`"}`)
if resp.StatusCode != http.StatusNoContent {
t.Fatalf("focus status = %d", resp.StatusCode)
}
resp.Body.Close()
// B's stream should observe a presence frame including A's focus.
ev := waitForEvent(t, b, "presence.updated", 200*time.Millisecond)
if !strings.Contains(ev.Data, `"focused_topic_id":"`+created.ID+`"`) {
t.Fatalf("B missing focus in presence: %s", ev.Data)
}
}
go test ./internal/server/ -run 'TestE2E_' -v
Expected: all pass.
git add internal/server/e2e_realtime_test.go
git commit -m "wiki-browser: server — e2e multi-client realtime tests"
A final pre-merge check.
go test ./... -count=1
Expected: zero failures.
go vet ./...
Expected: clean.
make build
Expected: produces dist/wiki-browser and dist/wb-agent.
make build-arm64
Expected: produces dist/wiki-browser-arm64 and dist/wb-agent-arm64.
If the suite is green and the binaries build, no commit is needed. If you had to fix a stray import or test, commit with wiki-browser: clean up after #6 implementation.
chrome.js wiring. Per the spec's Client wiring section, the contract is fixed in #6 but implementation lives in #8 (UI integration). The server tests above prove the wire works end-to-end; the browser-side wiring lands when #8 starts.perspective.refreshed event payload. #5 owns it. Task 14 publishes job.updated for perspective jobs, which is sufficient for #5 to extend.proxy_buffering off;. Track separately when the deployment lands.Coverage spot-check against the spec sections:
defer hub.Close() in main), Task 6 (defer sub.Close() in handler).agent/e2e_test.go to extend post-implementation).