Compare commits

...

30 Commits

Author SHA1 Message Date
Otto f3ce08497a docs: add comprehensive project context file (CONTEXT.md) for agent reference
Dev Build & Deploy / test-and-build (push) Failing after 0s
Dev Build & Deploy / docker-build-push (push) Has been skipped
2026-05-21 13:29:24 -04:00
overseer fd60b0bb57 Merge pull request 'CUB-200: Implement WebSocket Gateway Client' (!42) from agent/dex/CUB-200-ws-gateway-client into dev
Dev Build & Deploy / test-and-build (push) Failing after 1s
Dev Build & Deploy / docker-build-push (push) Has been skipped
Reviewed-on: https://code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/pulls/42
2026-05-21 06:54:55 -04:00
Rex b7b05bb4e3 CUB-200: fix event-loss race — register handlers before readLoop starts
Dev Build & Deploy / test-and-build (pull_request) Failing after 0s
Dev Build & Deploy / docker-build-push (pull_request) Has been skipped
Move registerEventHandlers() call before the readLoop goroutine starts
in connectAndRun(). This eliminates the startup window where live gateway
events were actively read and dropped as 'unhandled' because handler
registration happened only after initialSync completed.

The handlers only depend on c.agents and c.broker, which are wired in the
constructor — they do not require initialSync to have completed.

Also adds TestConnectAndRun_EventNotLostDuringSync regression test that
sends a live presence event during initial sync and asserts it is not lost.

All gateway tests pass with -race.
2026-05-20 21:52:39 +00:00
Rex d370d5ec23 CUB-200: fix WS initial sync ordering — start readLoop before initialSync
Dev Build & Deploy / test-and-build (pull_request) Failing after 0s
Dev Build & Deploy / docker-build-push (pull_request) Has been skipped
The root cause of the initial sync timeout was that connectAndRun called
initialSync (which uses Send/RPC) before starting readLoop, but Send's
response delivery depends on readLoop→routeFrame→handleResponse. Without
the readLoop running, agents.list and sessions.list would always time out.

Fix: start readLoop in a goroutine before calling initialSync so that
RPC responses are properly routed back to pending Send() calls. After
initialSync completes, event handlers are registered and MarkWSReady
is called. The connectAndRun function then blocks on the readLoop
goroutine's completion.

Also added TestConnectAndRun_InitialSyncOrdering which verifies that
agents are persisted from initial sync (would hang/timeout under the
old ordering).
2026-05-20 21:42:31 +00:00
Dex 1b82e1d3a6 CUB-200: resolve merge conflicts with dev — adopt dev's consolidated workflows and improved Go gateway code
Dev Build & Deploy / test-and-build (pull_request) Failing after 0s
Dev Build & Deploy / docker-build-push (pull_request) Has been skipped
2026-05-20 21:26:17 +00:00
overseer 93bf434a47 Merge pull request 'CUB-125: implement real-time SSE/WebSocket in React frontend' (!46) from agent/rex/CUB-125-realtime-sse-clean into dev
Dev Build & Deploy / test-and-build (push) Failing after 0s
Dev Build & Deploy / docker-build-push (push) Has been skipped
Reviewed-on: https://code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/pulls/46
2026-05-20 13:14:29 -04:00
Rex 010408cc45 CUB-125: address Grimm review — tests, type fixes, error state circuit breaker
Dev Build & Deploy / test-and-build (pull_request) Failing after 0s
Dev Build & Deploy / docker-build-push (pull_request) Has been skipped
- Add missing 'offline' to AgentStatus union type (types/index.ts)
- Add max-retry circuit breaker to useSSE; error state is now reachable
- Wire typed SSE payloads (SSEPayloadMap discriminated union) into useRealtimeSync
- Add Vitest + 20 unit tests: useSSE lifecycle, back-off, circuit breaker,
  event parsing, cleanup; useRealtimeSync event-to-invalidation mapping
- Rebase on dev to remove stale CUB-119 legacy-deletion commit and align
  CI workflow (dev already consolidated into single dev.yml)
- Tests: npm test → 20/20 pass; Build: npm run build → 0 errors
2026-05-20 12:58:21 -04:00
overseer 23f9d4a8fb CUB-125: implement real-time SSE/WebSocket in React frontend
- Add useSSE hook with exponential back-off reconnect (1s → 30s)
- Add useRealtimeSync hook: maps SSE events to React Query invalidation
  (agent.status → agents; agent.task/agent.progress → tasks+agents; fleet.update → all)
- Add SSEContext/SSEProvider so connection status is available app-wide
- Mount SSEProvider in main.tsx inside QueryClientProvider (no polling)
- Show live/connecting/reconnecting/disconnected badge in sidebar + mobile header
- Update SettingsPage: replace polling interval UI with SSE status panel
- Disable React Query polling (staleTime 60s); all updates pushed via SSE

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 12:58:21 -04:00
overseer 3d5bf16d37 Merge pull request 'CUB-207: unit tests for event handlers and initial sync' (!44) from agent/dex/CUB-207-event-sync-tests-v2 into dev
Dev Build & Deploy / test-and-build (push) Failing after 0s
Dev Build & Deploy / docker-build-push (push) Has been skipped
Reviewed-on: https://code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/pulls/44
2026-05-20 12:47:27 -04:00
Dex d9a1640b10 CUB-200: sync CI workflows with dev branch
Dev Build & Deploy / test-and-build (pull_request) Failing after 0s
Dev Build & Deploy / docker-build-push (pull_request) Has been skipped
- Overwrite dev.yml with dev's consolidated version (parameterized Go/Node versions, cleaner install steps)
- Add deploy-dev.yaml from dev (was missing on this branch)
- build-dev.yaml confirmed absent (was deleted on dev in PR #45)
2026-05-20 16:29:57 +00:00
Otto 5347944c4c Merge branch 'dev' into agent/dex/CUB-207-event-sync-tests-v2
Dev Build & Deploy / test-and-build (pull_request) Failing after 14m36s
Dev Build & Deploy / docker-build-push (pull_request) Has been cancelled
2026-05-20 12:24:40 -04:00
Otto 48a8598d3b CUB-CI: Consolidate workflows — remove build-dev.yaml, fix dev.yml
Dev Build & Deploy / test-and-build (push) Failing after 1s
Dev Build & Deploy / docker-build-push (push) Failing after 10m1s
2026-05-20 12:24:32 -04:00
Otto a0eb393c6c CUB-CI: Consolidate CI — switch to ubuntu-latest with manual Go/Node install
Dev Build & Deploy / test-and-build (pull_request) Failing after 0s
Dev Build & Deploy / docker-build-push (pull_request) Has been skipped
- Remove custom go-react runner label (was inconsistent on PR branches)
- Replace with ubuntu-latest + manual Go 1.23 / Node 22 install
  (actions/setup-go and setup-node don't work with Gitea Act runner)
- Remove duplicate build-dev.yaml workflow (already deleted)
- All steps: Go test → Go build → npm ci → npm lint → npm build
- Docker push on push events only (unchanged)
2026-05-20 12:23:32 -04:00
Otto d294818581 CUB-CI: Remove redundant build-dev.yaml 2026-05-20 12:23:16 -04:00
Otto 9e0366e780 CUB-CI: Remove redundant build-dev.yaml — dev.yml already handles this
build-dev.yaml uses actions/setup-go@v5 and actions/setup-node@v4 which
are incompatible with Gitea Act runner (no node20 runtime). dev.yml is
the canonical build workflow; having two competing workflows on the same
triggers was causing duplicate CI runs and misleading failures.
2026-05-20 12:22:19 -04:00
overseer 20404b30bb Merge branch 'dev' into agent/dex/CUB-207-event-sync-tests-v2
Build (Dev) / trigger-deploy (pull_request) Has been cancelled
Dev Build & Deploy / test-and-build (pull_request) Has been cancelled
Dev Build & Deploy / docker-build-push (pull_request) Has been cancelled
Build (Dev) / build-frontend (pull_request) Failing after 1s
Build (Dev) / build-go-backend (pull_request) Failing after 13m33s
2026-05-20 12:15:43 -04:00
overseer b7a54c8461 Merge pull request 'CUB-203: WebSocket client scaffold for OpenClaw gateway v3' (!41) from agent/dex/CUB-203-ws-client-scaffold into dev
Build (Dev) / build-frontend (push) Failing after 1s
Build (Dev) / trigger-deploy (push) Has been skipped
Build (Dev) / build-go-backend (push) Failing after 0s
Dev Build & Deploy / test-and-build (push) Has been cancelled
Dev Build & Deploy / docker-build-push (push) Has been cancelled
Reviewed-on: https://code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/pulls/41
2026-05-20 12:14:02 -04:00
overseer b6e44cb4f8 Merge branch 'dev' into agent/dex/CUB-203-ws-client-scaffold
Build (Dev) / build-go-backend (pull_request) Failing after 0s
Build (Dev) / build-frontend (pull_request) Failing after 0s
Build (Dev) / trigger-deploy (pull_request) Has been skipped
Dev Build & Deploy / test-and-build (pull_request) Has been cancelled
Dev Build & Deploy / docker-build-push (pull_request) Has been cancelled
2026-05-20 09:01:58 -04:00
Joshua King 49b959aee5 Add CI Docker image with Go 1.23 + Node 22 pre-installed, update workflow to use go-react label
Dev Build & Deploy / test-and-build (push) Has been cancelled
Dev Build & Deploy / docker-build-push (push) Has been cancelled
2026-05-20 08:47:16 -04:00
Joshua King ae37d79aa8 Switch to ubuntu-dotnet runner label to bypass /var/run symlink issue
Dev Build & Deploy / test-and-build (push) Has been cancelled
Dev Build & Deploy / docker-build-push (push) Has been cancelled
2026-05-20 08:39:05 -04:00
Joshua King 8fb4183abe Add container spec to fix /var/run symlink path escape error
Dev Build & Deploy / test-and-build (push) Failing after 7s
Dev Build & Deploy / docker-build-push (push) Has been skipped
2026-05-20 08:30:49 -04:00
Dex 439741e55f CUB-207: fix unused broker variable in test
Dev Build / deploy-dev (pull_request) Has been cancelled
Build (Dev) / build-go-backend (pull_request) Failing after 0s
Build (Dev) / build-frontend (pull_request) Failing after 1s
Build (Dev) / trigger-deploy (pull_request) Has been skipped
Dev Build / build-test (pull_request) Has been cancelled
openclaw/grimm-review Review in progress
2026-05-20 08:04:05 -04:00
Dex 3c26b8deba CUB-207: add unit tests for event handlers and initial sync 2026-05-20 11:58:42 +00:00
Dex 4569fef11d CUB-203: fix Grimm review blocking issues (PR #41)
Build (Dev) / build-go-backend (pull_request) Failing after 0s
Build (Dev) / trigger-deploy (pull_request) Has been skipped
Build (Dev) / build-frontend (pull_request) Failing after 1s
Dev Build / deploy-dev (pull_request) Has been cancelled
Dev Build / build-test (pull_request) Has been cancelled
openclaw/grimm-review All 11 findings resolved. Approved.
🔴 readLoop race: replace WriteControl close with ctx-done goroutine that closes conn
🔴 duplicate event handlers: clear handlers map before re-registering on reconnect
🔴 sync.go CurrentTask abuse: add DisplayName field to UpdateAgentRequest, use it
🔴 sync.go newRole dead code: add Role field to UpdateAgentRequest, use it
🔴 events.go handlePresence DB/SSE inconsistency: pass LastActivityAt in update, don't mutate after DB
🔴 events.go handleAgentConfig DB/SSE inconsistency: use DisplayName/Role fields in update
🟠 Send() nil-conn panic: check conn != nil before WriteJSON
🟠 readLoop prompt ctx cancellation: fixed by item #1
🟠 backoff never resets: reset to initialBackoff after successful connectAndRun
🟠 MarkWSReady double-close race: use sync.Once in Client
Extra json:"-" dead fields: removed from sessionChangedPayload, presencePayload, agentConfigPayload
UpdateAgentRequest: added DisplayName, Role, LastActivityAt fields
2026-05-20 11:47:11 +00:00
Dex 7a93d43b7e CUB-205: add gateway utility function tests + fix channel default
Build (Dev) / build-go-backend (pull_request) Failing after 1s
Build (Dev) / trigger-deploy (pull_request) Has been skipped
Dev Build / deploy-dev (pull_request) Has been cancelled
Dev Build / build-test (pull_request) Has been cancelled
Build (Dev) / build-frontend (pull_request) Failing after 1s
2026-05-20 11:35:02 +00:00
overseer efcedde649 Merge branch 'dev' into agent/dex/CUB-203-ws-client-scaffold
Build (Dev) / build-frontend (pull_request) Failing after 2s
Build (Dev) / build-go-backend (pull_request) Failing after 14m20s
Dev Build / deploy-dev (pull_request) Has been cancelled
Build (Dev) / trigger-deploy (pull_request) Has been cancelled
Dev Build / build-test (pull_request) Has been cancelled
2026-05-20 07:30:09 -04:00
Dex e131798f3b CUB-204: wire WS client as primary, REST poller as fallback
Dev Build / build-test (pull_request) Failing after 1s
Build (Dev) / trigger-deploy (pull_request) Has been skipped
openclaw/grimm-review REJECTED — 6 blocking issues
Build (Dev) / build-go-backend (pull_request) Failing after 0s
Build (Dev) / build-frontend (pull_request) Failing after 1s
- Rename GatewayURL/GatewayPollInterval → GatewayRestURL/GatewayRestPollInterval
- Change Docker-aware defaults (host.docker.internal instead of localhost)
- Client.Start() waits for WS readiness (30s timeout), falls back to REST
- Client.SetWSClient()/MarkWSReady() for WS→REST coordination
- WSClient.SetRESTClient() so WS notifies REST on successful handshake
- main.go wires both clients: WS primary, REST fallback with cross-references
- .env.example documents WS_GATEWAY_URL, GATEWAY_TOKEN, REST fallback vars
- docker-compose.yml adds WS_GATEWAY_URL and GATEWAY_TOKEN env vars
- reference/CONTROL_CENTER_CONTEXT.md documents architecture and startup sequence
2026-05-20 11:16:05 +00:00
Dex 9062f8fa8d CUB-202: add real-time event handlers for sessions.changed, presence, agent.config
Dev Build / build-test (pull_request) Failing after 0s
2026-05-20 11:13:53 +00:00
Dex 60ba3e5b4f CUB-201: add initial sync via agents.list + sessions.list RPCs
Dev Build / build-test (pull_request) Failing after 1s
- Create gateway/sync.go with initialSync method on WSClient
- Fetch agents via agents.list RPC, persist to AgentRepo
- Fetch sessions via sessions.list RPC, map status to AgentStatus
- Merge session state (status, sessionKey, tokens) into AgentCardData
- Broadcast merged fleet as fleet.update via SSE broker
- Trigger initialSync after hello-ok handshake
- Re-sync automatically on reconnect (connectAndRun calls initialSync)
- Handle unknown gateway fields gracefully via typed extraction
2026-05-20 11:07:23 +00:00
Dex 70d39b87d1 CUB-203: add WebSocket client scaffold for OpenClaw gateway v3
Dev Build / build-test (pull_request) Failing after 14s
2026-05-20 11:02:21 +00:00
33 changed files with 4243 additions and 246 deletions
+9 -10
View File
@@ -12,16 +12,15 @@ ENVIRONMENT=development
# Format: postgresql://user:password@host:port/database?sslmode=disable
DATABASE_URL=postgresql://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable
# Gateway (OpenClaw) connection — WebSocket (primary)
# WebSocket URL for real-time OpenClaw gateway v3 protocol
GATEWAY_WS_URL=ws://localhost:18789/
# Auth token for the OpenClaw gateway (operator scope)
OPENCLAW_GATEWAY_TOKEN=
# Gateway (OpenClaw) connection
# WebSocket gateway config (primary path)
WS_GATEWAY_URL=ws://host.docker.internal:18789/
# Gateway auth token — same as OPENCLAW_GATEWAY_TOKEN (set in environment)
GATEWAY_TOKEN=
# Gateway (OpenClaw) connection — REST (fallback)
# URL to the OpenClaw gateway API for polling agent states (used only if WS fails)
GATEWAY_URL=http://localhost:18789/api/agents
# Polling interval for agent state updates
# REST poller config (fallback, only used if WS fails to connect)
GATEWAY_URL=http://host.docker.internal:18789/api/agents
# Polling interval for agent state updates (fallback only)
GATEWAY_POLL_INTERVAL=5s
# ── Frontend Variables (via Vite) ───────────────────────────────────────
@@ -33,7 +32,7 @@ GATEWAY_POLL_INTERVAL=5s
# When using docker-compose, these are set in the services section
# See docker-compose.yml for service-specific environment variables
# ── Database Configuration ─────────────────────────────────────────────
# ── Database Configuration ───────────────────────────────────────────────
# Set in the db service environment section of docker-compose.yml
# POSTGRES_USER=controlcenter
# POSTGRES_PASSWORD=controlcenter
+126
View File
@@ -0,0 +1,126 @@
name: Deploy (Dev)
on:
repository_dispatch:
types:
- dev-build-success
workflow_dispatch:
env:
BINARY_NAME: server
DEV_HOST: ${{ secrets.DEV_HOST }}
DEV_USER: ${{ secrets.DEV_USER }}
DEPLOY_BINARY_PATH: /opt/control-center/server
DEPLOY_FRONTEND_PATH: /usr/share/nginx/html
SERVICE_NAME: control-center-server
FRONTEND_SERVICE: nginx
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- name: Download Go binary
uses: actions/download-artifact@v4
with:
name: go-backend-binary
- name: Download frontend dist
uses: actions/download-artifact@v4
with:
name: frontend-dist
path: dist
- name: Make binary executable
run: chmod +x ${{ env.BINARY_NAME }}
- name: Generate deploy script
run: |
cat > deploy.sh <<'SCRIPT'
#!/usr/bin/env bash
set -euo pipefail
BINARY="${1}"
FRONTEND_DIST="${2:-dist}"
BINARY_PATH="${3:-/opt/control-center/server}"
FRONTEND_PATH="${4:-/usr/share/nginx/html}"
BINARY_SERVICE="${5:-control-center-server}"
FRONTEND_SERVICE="${6:-nginx}"
TIMESTAMP=$(date +%Y%m%d%H%M%S)
BACKUP="${BINARY_PATH}.${TIMESTAMP}.bak"
echo "=== deploy backend ==="
if [ -f "$BINARY_PATH" ]; then
echo "backing up current binary"
cp "$BINARY_PATH" "$BACKUP"
fi
echo "installing new binary"
cp "$BINARY" "$BINARY_PATH"
chmod +x "$BINARY_PATH"
echo "restarting service"
systemctl reload-or-restart "$BINARY_SERVICE" || systemctl restart "$BINARY_SERVICE"
sleep 3
if ! systemctl is-active --quiet "$BINARY_SERVICE"; then
echo "FAILED: $BINARY_SERVICE did not start — rolling back"
if [ -f "$BACKUP" ]; then
cp "$BACKUP" "$BINARY_PATH"
systemctl restart "$BINARY_SERVICE"
fi
exit 1
fi
echo "backend deploy ok — keeping last 3 backups"
ls -t "${BINARY_PATH}."*.bak 2>/dev/null | tail -n +4 | xargs -r rm -f
echo "=== deploy frontend ==="
if [ -d "$FRONTEND_DIST" ] && [ -n "$(ls -A "$FRONTEND_DIST" 2>/dev/null)" ]; then
rsync -a --delete "$FRONTEND_DIST/" "$FRONTEND_PATH/"
systemctl reload "$FRONTEND_SERVICE" 2>/dev/null ||:
echo "frontend deploy ok"
fi
echo "=== deploy complete ==="
SCRIPT
chmod +x deploy.sh
- name: Copy artifacts to dev server
uses: appleboy/scp-action@v0.1.7
with:
host: ${{ env.DEV_HOST }}
username: ${{ env.DEV_USER }}
key: ${{ secrets.DEV_SSH_KEY }}
source: "${{ env.BINARY_NAME }},deploy.sh,dist"
target: "/tmp/control-center-deploy"
- name: Execute deploy on dev server
uses: appleboy/ssh-action@v1
with:
host: ${{ env.DEV_HOST }}
username: ${{ env.DEV_USER }}
key: ${{ secrets.DEV_SSH_KEY }}
script: |
set -euo pipefail
cd /tmp/control-center-deploy
sudo ./deploy.sh \
"${{ env.BINARY_NAME }}" \
"dist" \
"${{ env.DEPLOY_BINARY_PATH }}" \
"${{ env.DEPLOY_FRONTEND_PATH }}" \
"${{ env.SERVICE_NAME }}" \
"${{ env.FRONTEND_SERVICE }}"
rm -rf /tmp/control-center-deploy
- name: Notify on failure
if: failure()
uses: appleboy/ssh-action@v1
with:
host: ${{ env.DEV_HOST }}
username: ${{ env.DEV_USER }}
key: ${{ secrets.DEV_SSH_KEY }}
script: |
echo "deploy failed — commit ${{ github.sha }}" > /tmp/control-center-deploy-failure.log
+9 -7
View File
@@ -7,6 +7,8 @@ on:
branches: [dev]
env:
GO_VERSION: "1.23"
NODE_VERSION: "22"
REGISTRY: code.cubecraftcreations.com
BACKEND_IMAGE: ${{ gitea.repository }}/backend
FRONTEND_IMAGE: ${{ gitea.repository }}/frontend
@@ -18,11 +20,16 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Setup Go
- name: Install Go
run: |
curl -sL https://go.dev/dl/go1.23.6.linux-amd64.tar.gz | tar -C /usr/local -xz
curl -fsSL "https://go.dev/dl/go${GO_VERSION}.linux-amd64.tar.gz" | sudo tar -C /usr/local -xz
echo "/usr/local/go/bin" >> $GITHUB_PATH
- name: Install Node.js
run: |
curl -fsSL "https://nodejs.org/dist/v${NODE_VERSION}/node-v${NODE_VERSION}-linux-x64.tar.xz" | sudo tar -C /usr/local --strip-components=1 -xJ
echo "/usr/local/bin" >> $GITHUB_PATH
- name: Run backend tests
run: go test ./...
working-directory: ./go-backend
@@ -31,11 +38,6 @@ jobs:
run: go build -ldflags="-w -s" -o /tmp/server ./cmd/server
working-directory: ./go-backend
- name: Setup Node
run: |
curl -sL https://deb.nodesource.com/setup_22.x | bash -
apt-get install -y nodejs
- name: Install frontend deps
run: npm ci
working-directory: ./frontend
+11
View File
@@ -0,0 +1,11 @@
FROM catthehacker/ubuntu:act-latest
# Install Go 1.23
RUN curl -sL https://go.dev/dl/go1.23.6.linux-amd64.tar.gz | tar -C /usr/local -xz
# Install Node 22
RUN curl -fsSL https://deb.nodesource.com/setup_22.x | bash - \
&& apt-get install -y nodejs \
&& rm -rf /var/lib/apt/lists/*
ENV PATH="/usr/local/go/bin:${PATH}"
+2
View File
@@ -16,6 +16,8 @@ services:
- ENVIRONMENT=production
- PORT=8080
- GATEWAY_URL=http://host.docker.internal:18789/api/agents
- WS_GATEWAY_URL=ws://host.docker.internal:18789/
- GATEWAY_TOKEN=${GATEWAY_TOKEN:-}
depends_on:
db:
condition: service_healthy
+304
View File
@@ -0,0 +1,304 @@
# Control Center — Project Context
> **Last updated:** 2026-05-21
> **Repo:** `CubeCraft-Creations/Control-Center` | **Host:** `code.cubecraftcreations.com`
> **Local clone:** `/mnt/ai-storage/projects/Control-Center` | **Default branch:** `dev`
> **Discord:** `DISCORD_DEV_CONTROL_CENTER_CHANNEL_ID`
> **Linear Epic:** [CUB-119](https://linear.app/cubecraft-creations/issue/CUB-119)
---
## Overview
Real-time dashboard for monitoring and controlling the OpenClaw agent fleet. Displays agent statuses, active tasks, sessions, and projects. Uses SSE for live updates from the Go backend, which connects to the OpenClaw gateway via WebSocket for live agent data.
**Completed refactor:** ASP.NET Core + Angular → Go + React is done (CUB-119 epic). All legacy code is removed from git.
## Tech Stack
| Layer | Technology | Notes |
|-------|-----------|-------|
| Backend | Go 1.24+ | Chi router, pgx (PostgreSQL), SSE broker, gorilla/websocket |
| Frontend | React 18 + TypeScript | Vite, Tailwind CSS, React Query, TanStack Router |
| Database | PostgreSQL 16+ | snake_case naming, 2 migrations |
| Real-time | SSE + WebSocket | SSE for browser, WebSocket for OpenClaw gateway |
| Gateway Integration | WebSocket client | OpenClaw gateway `/ws` — live agent + session RPC |
| API Client | TypeScript SDK | `api-client/` — shared models, WS client, HTTP client |
| CI/CD | Gitea Actions | `.gitea/workflows/dev.yml`, `deploy-dev.yaml` |
| Deployment | Docker Compose | PostgreSQL + Go backend + React/nginx |
| Testing | Vitest, Go test | Unit + integration tests for WS client, gateway, handlers |
| Design | Kiosk layout | Bottom nav (mobile), nav rail (desktop), quick-jump drawer |
## Architecture
```
OpenClaw Gateway (WebSocket)
Go Backend (Chi + pgx)
├── Gateway WS Client (connect, reconnect, agents.list, sessions.list RPC)
├── SSE Broker (fan-out: agent.status, agent.task, fleet.update)
├── REST API (/api/agents, /api/sessions, /api/tasks, /api/projects)
└── Repository/Store layers → PostgreSQL
├── SSE /api/events
React Frontend
├── SSEProvider → useRealtimeSync → React Query cache
├── HubPage (dashboard), LogsPage, ProjectsPage, SessionsPage, SettingsPage
└── Layout (header bar + nav rail + bottom nav + quick-jump drawer)
```
### Key Architecture Decisions
1. **Go replaced ASP.NET Core** — lighter runtime, faster cold-start, better concurrency for gateway polling
2. **React replaced Angular** — lighter than Angular for dashboard/kiosk use
3. **SSE over SignalR** — simpler server-side, unidirectional events sufficient for browser updates
4. **WebSocket for gateway integration** — bidirectional needed for RPC (agents.list, sessions.list)
5. **PostgreSQL** — shared with Extrudex pattern; migrations in `go-backend/migrations/`
6. **Agent state seeded on first boot** via `gateway.SeedDemoAgents` for offline dev
## Project Structure
```
Control-Center/
├── go-backend/ # Go backend
│ ├── cmd/server/main.go # Entrypoint, wire deps, start gateway poller
│ ├── Dockerfile / go.mod / go.sum
│ ├── migrations/ # 001_initial_schema, 002_add_indexes
│ └── internal/
│ ├── config/config.go # Env vars (DATABASE_URL, GATEWAY_URL, etc.)
│ ├── db/db.go # PostgreSQL pool (pgx)
│ ├── gateway/
│ │ ├── client.go # GW poller → sync DB + SSE fan-out
│ │ ├── events.go # SSE event broker
│ │ ├── events_test.go
│ │ ├── sync.go # Initial sync from gateway
│ │ ├── sync_test.go
│ │ ├── wsclient.go # WebSocket client (handshake, connect, reconnect, RPC)
│ │ └── wsclient_test.go
│ ├── handler/
│ │ ├── agent.go # CRUD + history
│ │ ├── project.go # List projects
│ │ ├── session.go # List sessions
│ │ ├── sse.go # SSE broker: subscribe + broadcast
│ │ ├── task.go # List tasks
│ │ ├── helpers.go
│ │ ├── handler_test.go
│ │ └── mock_repos_test.go
│ ├── models/models.go # Domain types
│ ├── repository/ # DB access layer + interfaces
│ ├── router/router.go # Chi router: REST + SSE mount
│ └── store/ # Agent, Project, Session, Task stores
├── api-client/ # Shared TypeScript SDK
│ └── src/
│ ├── models/types.ts # Agent, Session, Task, Project, SSE event types
│ ├── services/http-client.ts # Axios REST client
│ ├── utils/
│ │ ├── config.ts # Client config
│ │ └── status-mapper.ts # Agent status → display mapping
│ └── websocket/
│ └── ws-client.ts # WebSocket client (handshake, Send, RPC, reconnector)
├── frontend/ # React frontend
│ ├── Dockerfile + nginx.conf
│ ├── package.json + vite.config.ts
│ ├── src/
│ │ ├── App.tsx / main.tsx
│ │ ├── components/
│ │ │ ├── ErrorBoundary.tsx
│ │ │ └── Layout.tsx # Header bar + nav rail + bottom nav + quick-jump
│ │ ├── contexts/
│ │ │ └── SSEContext.tsx # SSEProvider — wraps entire app
│ │ ├── hooks/
│ │ │ ├── useLocalStorage.ts
│ │ │ ├── useRealtimeSync.ts # SSE messages → React Query cache
│ │ │ ├── useRealtimeSync.test.tsx
│ │ │ ├── useSSE.ts # SSE: connect, reconnect, typed events
│ │ │ ├── useSSE.test.ts
│ │ │ └── useTheme.tsx
│ │ ├── pages/
│ │ │ ├── HubPage.tsx # Fleet dashboard (agent grid + stats)
│ │ │ ├── LogsPage.tsx # Agent log viewer
│ │ │ ├── ProjectsPage.tsx # Project list
│ │ │ ├── SessionsPage.tsx # Session list
│ │ │ └── SettingsPage.tsx # Settings + theme toggle
│ │ ├── services/
│ │ │ ├── api.ts # Axios REST client
│ │ │ └── sse.ts # SSE utilities
│ │ └── types/index.ts
│ └── vitest.config.ts
├── frontend-legacy/ # Original Angular frontend (kept for reference, not in git)
├── backend/ # Original ASP.NET backend (kept for reference, not in git)
│ ├── ControlCenter/ # ASP.NET Core project
│ └── Api/ # API layer
├── design/
│ ├── command-hub-spec.md # Detailed design spec
│ └── mockups/ # Desktop kiosk, mobile, quick-jump drawer
├── kiosk/
│ ├── control-center-kiosk.service # Systemd service
│ └── start-kiosk.sh # Kiosk startup script
├── reference/
│ └── CONTROL_CENTER_CONTEXT.md # Older context file (superseded by this one)
├── ci-image/Dockerfile # CI build image
├── docker-compose.yml
└── .env.example
```
## Database Schema (PostgreSQL)
### agents
| Column | Type | Notes |
|--------|------|-------|
| id | UUID PK | |
| display_name | VARCHAR(256) NOT NULL | |
| role | VARCHAR(256) | |
| status | VARCHAR(32) | active, idle, thinking, error |
| current_task | VARCHAR(512) | |
| task_progress | INTEGER DEFAULT 0 | |
| session_key | VARCHAR(256) | |
| channel | VARCHAR(256) | |
| last_activity | TIMESTAMP | |
| error_message | TEXT | |
| created_at | TIMESTAMP | |
| updated_at | TIMESTAMP | |
### sessions
| Column | Type |
|--------|------|
| id | UUID PK |
| session_key | VARCHAR(256) UNIQUE |
| agent_id | UUID FK → agents |
| channel | VARCHAR(256) |
| status | VARCHAR(32) |
| context_tokens | INTEGER |
| total_tokens | INTEGER |
| estimated_cost | NUMERIC |
| model | VARCHAR(256) |
| started_at | TIMESTAMP |
| last_activity_at | TIMESTAMP |
### tasks
| Column | Type |
|--------|------|
| id | UUID PK |
| agent_id | UUID FK → agents |
| title | VARCHAR(512) |
| description | TEXT |
| status | VARCHAR(32) |
| progress | INTEGER DEFAULT 0 |
| session_key | VARCHAR(256) |
| created_at, updated_at | TIMESTAMP |
### projects
| Column | Type |
|--------|------|
| id | UUID PK |
| name | VARCHAR(256) UNIQUE |
| description | TEXT |
| status | VARCHAR(32) |
| agent_ids | TEXT[] |
| created_at, updated_at | TIMESTAMP |
## API Endpoints
| Method | Endpoint | Description |
|--------|----------|-------------|
| GET | /health | Health check (probes DB) |
| GET | /api/agents | List all agents |
| POST | /api/agents | Create agent |
| GET | /api/agents/:id | Get agent detail |
| PUT | /api/agents/:id | Update agent |
| DELETE | /api/agents/:id | Delete agent |
| GET | /api/agents/:id/history | Agent status history |
| GET | /api/sessions | List sessions |
| GET | /api/tasks | List tasks |
| GET | /api/projects | List projects |
| GET | /api/events | SSE event stream |
## SSE Events
| Event Type | Payload |
|------------|---------|
| `agent.status` | Agent status change |
| `agent.task` | Agent current task updated |
| `agent.progress` | Task progress percentage |
| `fleet.update` | Full fleet snapshot |
| `connected` | Connection established |
## CI/CD Pipeline
### dev.yml
- Lint + typecheck: Go vet + golangci-lint + tsc
- Test: Go test + vitest
- Build: Go build + npm build
- Docker: Build and push images
- Triggers: push to dev/main
### deploy-dev.yaml
- Workflow dispatch
- SCP deploy script to dev host
- systemctl restart with rollback
## Docker Compose
| Service | Image/Build | Ports | Depends On |
|---------|-------------|-------|------------|
| postgres | postgres:16-alpine | 5432 | — |
| backend | ./go-backend/Dockerfile | 8080 | postgres (healthy) |
| frontend | ./frontend/Dockerfile | 3000 | backend (healthy) |
## Linear Issue Map
| CUB | Title | Status |
|-----|-------|--------|
| 119 | **Epic: Control Center Refactor — .NET → Go + React** | Todo |
| 120 | PostgreSQL schema + migrations | Done |
| 121 | React pages wired to real API | Done |
| 122 | React frontend scaffold | Done |
| 123 | Gateway integration + SSE streaming | Done |
| 124 | Go backend scaffold | Done |
| 125 | Real-time SSE frontend | Done |
| 126 | Docker Compose deployment | Done |
| 127 | CRUD API endpoints | Done |
| 200 | Live WebSocket gateway client (CUB-200-207 sub-epic) | In Review |
| 201 | agents.list + sessions.list RPC and data mapping | In Review |
| 202 | Real-time event subscription + SSE fan-out | In Review |
| 203 | WS client scaffold — handshake, connect, reconnect loop | In Review |
| 204 | Config, wiring, and graceful fallback | In Review |
| 205 | Unit tests — gateway utility functions | In Review |
| 206 | Unit tests — WSClient handshake, Send/RPC, frame router, reconnect | In Review |
| 207 | Unit tests — event handlers and initial sync | In Review |
### Legacy Issues (Angular/ASP.NET — all Done)
CUB-19 through CUB-63: All 27 Control Center issues completed, including minion mapping, breakroom UI, dark mode theme, agent cards, quick-jump drawer, adaptive nav, SignalR hub, and status animations.
## Known Limitations / Next Steps
1. Agent detail/history views are scaffolded but not fully implemented
2. 16-bit minion breakroom concept (CUB-59-63) was on Angular — needs React port if desired
3. `.env` must be created from `.env.example` with a valid `GATEWAY_URL` for live agent data
4. Docker containers not currently running — start with `docker compose up --build -d`
## Default Agent Assignments
| Area | Agent | Notes |
|------|-------|-------|
| Backend (Go API, Gateway WS, SSE) | Dex | gitea-dex MCP |
| Database (PostgreSQL schema) | Hex | gitea-hex MCP |
| Frontend (React, Tailwind) | Rex | gitea-rex MCP |
| Design (wireframes, UX) | Sketch | |
## Getting Started
```bash
cd /mnt/ai-storage/projects/Control-Center
git checkout dev
git pull origin dev
# Docker Compose (recommended)
cp .env.example .env # edit GATEWAY_URL first
docker compose up --build -d
# Manual
cd go-backend && go run cmd/server/main.go # → :8080
cd frontend && npm install && npm run dev # → :5173 (Vite proxy to :8080)
```
+1128 -1
View File
File diff suppressed because it is too large Load Diff
+8 -2
View File
@@ -7,7 +7,9 @@
"dev": "vite",
"build": "tsc -b && vite build",
"lint": "eslint .",
"preview": "vite preview"
"preview": "vite preview",
"test": "vitest run",
"test:watch": "vitest"
},
"dependencies": {
"@tanstack/react-query": "^5.100.9",
@@ -20,6 +22,8 @@
"devDependencies": {
"@eslint/js": "^10.0.1",
"@tailwindcss/vite": "^4.2.4",
"@testing-library/jest-dom": "^6.9.1",
"@testing-library/react": "^16.3.2",
"@types/node": "^24.12.2",
"@types/react": "^19.2.14",
"@types/react-dom": "^19.2.3",
@@ -29,10 +33,12 @@
"eslint-plugin-react-hooks": "^7.1.1",
"eslint-plugin-react-refresh": "^0.5.2",
"globals": "^17.5.0",
"jsdom": "^29.1.1",
"postcss": "^8.5.14",
"tailwindcss": "^4.2.4",
"typescript": "~6.0.2",
"typescript-eslint": "^8.58.2",
"vite": "^8.0.10"
"vite": "^8.0.10",
"vitest": "^4.1.7"
}
}
+33 -1
View File
@@ -1,6 +1,8 @@
import { useState } from 'react'
import { NavLink } from 'react-router-dom'
import { Command, Activity, FolderKanban, Monitor, Settings, Menu, X } from 'lucide-react'
import { Command, Activity, FolderKanban, Monitor, Settings, Menu, X, Wifi, WifiOff, Loader } from 'lucide-react'
import { useSSEContext } from '../contexts/SSEContext'
import type { SSEStatus } from '../hooks/useSSE'
const navItems = [
{ to: '/', icon: Command, label: 'Hub' },
@@ -10,9 +12,29 @@ const navItems = [
{ to: '/settings', icon: Settings, label: 'Settings' },
]
/** Small status pill shown in the sidebar footer and mobile header. */
function SSEStatusBadge({ status, showLabel = false }: { status: SSEStatus; showLabel?: boolean }) {
const cfg = {
connected: { icon: Wifi, color: 'text-green-500', label: 'Live' },
connecting: { icon: Loader, color: 'text-yellow-500 animate-spin', label: 'Connecting' },
reconnecting: { icon: Loader, color: 'text-yellow-500 animate-spin', label: 'Reconnecting' },
error: { icon: WifiOff, color: 'text-red-500', label: 'Disconnected' },
}[status]
const Icon = cfg.icon
return (
<div className="flex items-center gap-1.5" title={cfg.label}>
<Icon size={14} className={cfg.color} />
{showLabel && <span className={`text-xs ${cfg.color}`}>{cfg.label}</span>}
</div>
)
}
export default function Layout({ children }: { children: React.ReactNode }) {
const [expanded, setExpanded] = useState(false)
const [mobileOpen, setMobileOpen] = useState(false)
const { sseStatus } = useSSEContext()
return (
<div className="flex min-h-screen bg-surface-darkest text-on-surface">
@@ -46,6 +68,15 @@ export default function Layout({ children }: { children: React.ReactNode }) {
</NavLink>
))}
</nav>
{/* SSE connection status — footer of sidebar */}
<div className="px-4 py-3 border-t border-surface-light flex items-center gap-2">
<SSEStatusBadge status={sseStatus} />
{expanded && (
<span className="text-xs text-on-surface-muted whitespace-nowrap">
{sseStatus === 'connected' ? 'Live updates on' : sseStatus}
</span>
)}
</div>
</aside>
{/* Mobile Header + Bottom Nav */}
@@ -54,6 +85,7 @@ export default function Layout({ children }: { children: React.ReactNode }) {
<div className="flex items-center gap-2">
<Command size={22} className="text-primary" />
<span className="font-bold">Control Center</span>
<SSEStatusBadge status={sseStatus} />
</div>
<button onClick={() => setMobileOpen(!mobileOpen)} className="p-2">
{mobileOpen ? <X size={22} /> : <Menu size={22} />}
+23
View File
@@ -0,0 +1,23 @@
/**
* SSEContext — provides SSE connection status throughout the component tree.
* Mount <SSEProvider> once inside QueryClientProvider.
*/
import { createContext, useContext, type ReactNode } from 'react'
import { useRealtimeSync } from '../hooks/useRealtimeSync'
import type { SSEStatus } from '../hooks/useSSE'
interface SSEContextValue {
sseStatus: SSEStatus
}
const SSEContext = createContext<SSEContextValue>({ sseStatus: 'connecting' })
export function SSEProvider({ children }: { children: ReactNode }) {
const { sseStatus } = useRealtimeSync()
return <SSEContext.Provider value={{ sseStatus }}>{children}</SSEContext.Provider>
}
/** Access the SSE connection status from any component. */
export function useSSEContext(): SSEContextValue {
return useContext(SSEContext)
}
+129
View File
@@ -0,0 +1,129 @@
/**
* Tests for useRealtimeSync — event → query invalidation mapping.
*
* Uses .tsx extension so Vite/OXC can parse JSX in the wrapper component.
*/
import { renderHook } from '@testing-library/react'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { QueryClient, QueryClientProvider } from '@tanstack/react-query'
import * as useSSEModule from './useSSE'
import { useRealtimeSync } from './useRealtimeSync'
import React from 'react'
import type { SSEMessage } from '../services/sse'
describe('useRealtimeSync', () => {
let queryClient: QueryClient
let mockSSEOnMessage: ((msg: { type: string; data: unknown }) => void) | null = null
beforeEach(() => {
queryClient = new QueryClient({
defaultOptions: { queries: { retry: false } },
})
mockSSEOnMessage = null
// Spy on useSSE to capture the onMessage callback
vi.spyOn(useSSEModule, 'useSSE').mockImplementation((opts) => {
mockSSEOnMessage = opts?.onMessage ?? null
return { status: 'connected' }
})
})
afterEach(() => {
vi.restoreAllMocks()
})
function render() {
return renderHook(() => useRealtimeSync(), {
wrapper: ({ children }: { children: React.ReactNode }) => (
React.createElement(QueryClientProvider, { client: queryClient }, children)
),
})
}
it('invalidates ["agents"] on agent.status event', async () => {
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
render()
const msg: SSEMessage = {
type: 'agent.status',
data: { agentId: 'a1', status: 'active' },
}
mockSSEOnMessage!(msg)
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
expect(invalidateSpy).toHaveBeenCalledTimes(1)
})
it('invalidates ["tasks"] and ["agents"] on agent.task event', async () => {
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
render()
const msg: SSEMessage = {
type: 'agent.task',
data: { agentId: 'a1', taskId: 't1', title: 'Test', action: 'assigned' },
}
mockSSEOnMessage!(msg)
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['tasks'] })
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
expect(invalidateSpy).toHaveBeenCalledTimes(2)
})
it('invalidates ["tasks"] and ["agents"] on agent.progress event', async () => {
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
render()
const msg: SSEMessage = {
type: 'agent.progress',
data: { agentId: 'a1', taskId: 't1', progress: 50, message: 'working' },
}
mockSSEOnMessage!(msg)
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['tasks'] })
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
expect(invalidateSpy).toHaveBeenCalledTimes(2)
})
it('invalidates ["agents"], ["sessions"], ["tasks"] on fleet.update event', async () => {
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
render()
const msg: SSEMessage = {
type: 'fleet.update',
data: { timestamp: '2026-05-20T12:00:00Z', agentCount: 5 },
}
mockSSEOnMessage!(msg)
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['agents'] })
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['sessions'] })
expect(invalidateSpy).toHaveBeenCalledWith({ queryKey: ['tasks'] })
expect(invalidateSpy).toHaveBeenCalledTimes(3)
})
it('does nothing on connected event', async () => {
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
render()
const msg: SSEMessage = {
type: 'connected',
data: { clientCount: 1 },
}
mockSSEOnMessage!(msg)
expect(invalidateSpy).not.toHaveBeenCalled()
})
it('does nothing on unknown event types', async () => {
const invalidateSpy = vi.spyOn(queryClient, 'invalidateQueries')
render()
mockSSEOnMessage!({ type: 'unknown.event', data: {} })
expect(invalidateSpy).not.toHaveBeenCalled()
})
it('returns sseStatus from useSSE', () => {
const { result } = render()
expect(result.current.sseStatus).toBe('connected')
})
})
+64
View File
@@ -0,0 +1,64 @@
/**
* useRealtimeSync — mounts the SSE connection once at the app level and
* wires incoming events to React Query cache invalidation.
*
* Event → query key mapping:
* agent.status → ['agents']
* agent.task → ['tasks'], ['agents']
* agent.progress → ['tasks'], ['agents']
* fleet.update → ['agents'], ['sessions'], ['tasks']
*/
import { useQueryClient } from '@tanstack/react-query'
import { useCallback } from 'react'
import { useSSE, type SSEStatus } from './useSSE'
import type { SSEMessage } from '../services/sse'
export function useRealtimeSync(): { sseStatus: SSEStatus } {
const queryClient = useQueryClient()
const handleMessage = useCallback(
(raw: { type: string; data: unknown }) => {
// Cast to discriminated union — the backend contract guarantees these shapes
const msg = raw as SSEMessage
switch (msg.type) {
case 'agent.status':
// msg.data: AgentStatusEvent { agentId, status, reason? }
void msg.data.agentId // retained for type-narrowing — ensures payload matches contract
queryClient.invalidateQueries({ queryKey: ['agents'] })
break
case 'agent.task':
// msg.data: AgentTaskEvent { agentId, taskId, title, action }
void msg.data.agentId
queryClient.invalidateQueries({ queryKey: ['tasks'] })
queryClient.invalidateQueries({ queryKey: ['agents'] })
break
case 'agent.progress':
// msg.data: AgentProgressEvent { agentId, taskId, progress, message? }
void msg.data.agentId
queryClient.invalidateQueries({ queryKey: ['tasks'] })
queryClient.invalidateQueries({ queryKey: ['agents'] })
break
case 'fleet.update':
// msg.data: FleetUpdateEvent { timestamp, agentCount }
void msg.data.agentCount
queryClient.invalidateQueries({ queryKey: ['agents'] })
queryClient.invalidateQueries({ queryKey: ['sessions'] })
queryClient.invalidateQueries({ queryKey: ['tasks'] })
break
default:
// 'connected' and unknown events — no action needed
break
}
},
[queryClient],
)
const { status: sseStatus } = useSSE({ onMessage: handleMessage })
return { sseStatus }
}
+267
View File
@@ -0,0 +1,267 @@
/**
* Tests for useSSE — SSE connection lifecycle, back-off, event parsing, and cleanup.
*
* jsdom does not include EventSource, so we mock it completely.
*/
import { renderHook, act, waitFor } from '@testing-library/react'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { useSSE } from './useSSE'
// ---------------------------------------------------------------------------
// Mock EventSource — defined as a plain class so `new EventSource()` works
// ---------------------------------------------------------------------------
class MockEventSource {
url: string
onopen: (() => void) | null = null
onerror: ((evt: Event) => void) | null = null
onmessage: ((evt: MessageEvent) => void) | null = null
private listeners: Map<string, Array<(evt: Event) => void>> = new Map()
readyState: number = 0
constructor(url: string) {
this.url = url
}
addEventListener(type: string, handler: (evt: Event) => void) {
if (!this.listeners.has(type)) this.listeners.set(type, [])
this.listeners.get(type)!.push(handler)
}
removeEventListener() { /* no-op for tests */ }
close() {
this.readyState = 2
this.onopen = null
this.onerror = null
this.onmessage = null
this.listeners.clear()
}
// Test helpers
_simulateOpen() { this.onopen?.() }
_simulateError() { this.onerror?.(new Event('error')) }
_simulateNamedEvent(type: string, data: string) {
const handlers = this.listeners.get(type)
if (handlers) {
const evt = new MessageEvent(type, { data }) as Event
handlers.forEach((h) => h(evt))
}
}
_simulateMessage(data: string) {
this.onmessage?.(new MessageEvent('message', { data }) as MessageEvent)
}
static readonly CONNECTING = 0
static readonly OPEN = 1
static readonly CLOSED = 2
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
let esInstances: MockEventSource[]
describe('useSSE', () => {
beforeEach(() => {
esInstances = []
// Replace global EventSource with our mock class
Object.defineProperty(globalThis, 'EventSource', {
// The mock must use a class for `new EventSource()` to work
value: class extends MockEventSource {
constructor(url: string) {
super(url)
esInstances.push(this)
}
},
writable: true,
configurable: true,
})
vi.useFakeTimers({ shouldAdvanceTime: true })
})
afterEach(() => {
vi.restoreAllMocks()
vi.useRealTimers()
})
// ── Initial connection ──────────────────────────────────────────────────
it('starts in "connecting" state and creates an EventSource', () => {
const { result } = renderHook(() => useSSE({ url: '/api/events' }))
expect(result.current.status).toBe('connecting')
expect(esInstances.length).toBeGreaterThanOrEqual(1)
expect(esInstances[0].url).toBe('/api/events')
})
it('transitions to "connected" on open', async () => {
const onOpen = vi.fn()
const { result } = renderHook(() => useSSE({ url: '/api/events', onOpen }))
act(() => { esInstances[0]._simulateOpen() })
await waitFor(() => {
expect(result.current.status).toBe('connected')
})
expect(onOpen).toHaveBeenCalledTimes(1)
})
// ── Reconnection with exponential back-off ──────────────────────────────
it('retries after error with exponential back-off', async () => {
const { result } = renderHook(() =>
useSSE({ url: '/api/events', reconnectBaseMs: 1000, reconnectMaxMs: 30000 }),
)
// First error → reconnecting, retry at 1s
act(() => { esInstances[0]._simulateError() })
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
expect(esInstances).toHaveLength(1)
// Advance 1000ms → second EventSource created
act(() => { vi.advanceTimersByTime(1000) })
expect(esInstances).toHaveLength(2)
// Second error → reconnecting, retry at 2s
act(() => { esInstances[1]._simulateError() })
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
act(() => { vi.advanceTimersByTime(2000) })
expect(esInstances).toHaveLength(3)
// Third error → reconnecting, retry at 4s
act(() => { esInstances[2]._simulateError() })
act(() => { vi.advanceTimersByTime(4000) })
expect(esInstances).toHaveLength(4)
})
it('caps reconnect delay at reconnectMaxMs', async () => {
renderHook(() =>
useSSE({ url: '/api/events', reconnectBaseMs: 1000, reconnectMaxMs: 10000 }),
)
// Force 5 errors to push the exponent past the cap
for (let i = 0; i < 5; i++) {
act(() => { esInstances[i]._simulateError() })
const expectedDelay = Math.min(1000 * 2 ** i, 10000)
act(() => { vi.advanceTimersByTime(expectedDelay) })
}
// 6 ES instances created (initial + 5 retries)
expect(esInstances).toHaveLength(6)
})
// ── Circuit-breaker (max retries) ───────────────────────────────────────
it('transitions to "error" after reconnectLimit is exceeded', async () => {
const { result } = renderHook(() =>
useSSE({ url: '/api/events', reconnectBaseMs: 100, reconnectLimit: 2 }),
)
// First error → reconnecting
act(() => { esInstances[0]._simulateError() })
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
// Advance → retry
act(() => { vi.advanceTimersByTime(100) })
// Second error → reconnecting (attempt 2, still ≤ limit)
act(() => { esInstances[1]._simulateError() })
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
act(() => { vi.advanceTimersByTime(200) })
// Third error → limit exceeded (3 > 2) → error
act(() => { esInstances[2]._simulateError() })
await waitFor(() => { expect(result.current.status).toBe('error') })
})
it('resets reconnect counter on successful connection', async () => {
const { result } = renderHook(() =>
useSSE({ url: '/api/events', reconnectBaseMs: 100, reconnectLimit: 3 }),
)
// Two errors then a successful connect
act(() => { esInstances[0]._simulateError() })
act(() => { vi.advanceTimersByTime(100) })
act(() => { esInstances[1]._simulateOpen() })
await waitFor(() => { expect(result.current.status).toBe('connected') })
// Now error again — counter should be reset, so we get fresh attempts
act(() => { esInstances[1]._simulateError() })
await waitFor(() => { expect(result.current.status).toBe('reconnecting') })
expect(result.current.status).toBe('reconnecting')
})
// ── Cleanup on unmount ───────────────────────────────────────────────────
it('closes EventSource on unmount', () => {
const closeSpy = vi.spyOn(MockEventSource.prototype, 'close')
const { unmount } = renderHook(() => useSSE({ url: '/api/events' }))
unmount()
expect(closeSpy).toHaveBeenCalled()
})
it('does not update state after unmount', async () => {
const { result, unmount } = renderHook(() => useSSE({ url: '/api/events' }))
unmount()
// These should be no-ops after unmount (mountedRef guards)
act(() => { esInstances[0]._simulateOpen() })
act(() => { esInstances[0]._simulateError() })
// State should not have changed
expect(result.current.status).toBe('connecting')
})
// ── Event parsing ───────────────────────────────────────────────────────
it('parses valid JSON data into objects', async () => {
const onMessage = vi.fn()
renderHook(() => useSSE({ url: '/api/events', onMessage }))
act(() => {
esInstances[0]._simulateNamedEvent('agent.status', JSON.stringify({ agentId: 'a1', status: 'active' }))
})
expect(onMessage).toHaveBeenCalledWith({
type: 'agent.status',
data: { agentId: 'a1', status: 'active' },
})
})
it('passes invalid JSON through as raw string', async () => {
const onMessage = vi.fn()
renderHook(() => useSSE({ url: '/api/events', onMessage }))
act(() => {
esInstances[0]._simulateNamedEvent('agent.status', 'not valid json {{{')
})
expect(onMessage).toHaveBeenCalledWith({
type: 'agent.status',
data: 'not valid json {{{',
})
})
// ── enabled=false skips connection ──────────────────────────────────────
it('does not create EventSource when enabled=false', () => {
const { result } = renderHook(() => useSSE({ url: '/api/events', enabled: false }))
expect(esInstances).toHaveLength(0)
expect(result.current.status).toBe('connecting')
})
// ── onError callback ────────────────────────────────────────────────────
it('calls onError on connection failure', async () => {
const onError = vi.fn()
renderHook(() =>
useSSE({ url: '/api/events', onError, reconnectBaseMs: 100 }),
)
act(() => { esInstances[0]._simulateError() })
expect(onError).toHaveBeenCalledTimes(1)
})
// ── Default URL ─────────────────────────────────────────────────────────
it('uses /api/events as default URL', () => {
renderHook(() => useSSE())
expect(esInstances[0].url).toBe('/api/events')
})
})
+180
View File
@@ -0,0 +1,180 @@
import { useEffect, useRef, useCallback, useState } from 'react'
/** SSE connection state reported to consumers. */
export type SSEStatus = 'connecting' | 'connected' | 'reconnecting' | 'error'
/** Typed SSE event received from the backend. */
export interface SSEMessage {
/** event: field from the SSE frame */
type: string
/** parsed JSON from the data: field */
data: unknown
}
export interface UseSSEOptions {
/** Endpoint URL — defaults to /api/events */
url?: string
/** Called for every SSE message (all event types) */
onMessage?: (msg: SSEMessage) => void
/** Called when connection opens or reconnects */
onOpen?: () => void
/** Called on every connection error (both transient and terminal) */
onError?: (err: Event) => void
/** Base delay in ms before the first reconnect attempt (default 1 000) */
reconnectBaseMs?: number
/** Maximum reconnect delay in ms (default 30 000) */
reconnectMaxMs?: number
/**
* Maximum number of consecutive reconnect attempts before giving up.
* When the limit is reached, status transitions to 'error'.
* Default undefined (unlimited).
*/
reconnectLimit?: number
/** Set false to disable auto-connect (useful in tests) */
enabled?: boolean
}
const SSE_EVENTS = ['agent.status', 'agent.task', 'agent.progress', 'fleet.update', 'connected'] as const
/**
* useSSE — mounts a persistent SSE connection to the Control Center backend.
*
* Handles:
* - Initial connection on mount
* - Exponential back-off reconnection on drop (1s → 2s → 4s … capped at reconnectMaxMs)
* - Circuit-breaker: after reconnectLimit consecutive failures, transitions to 'error'
* - Cleanup on unmount
* - All five event types: agent.status, agent.task, agent.progress, fleet.update, connected
*
* The 'connected' SSE event is an application-level handshake sent by the backend
* after the transport opens. This is distinct from onOpen, which fires at the
* transport level when the EventSource HTTP connection is established.
*/
export function useSSE({
url = '/api/events',
onMessage,
onOpen,
onError,
reconnectBaseMs = 1_000,
reconnectMaxMs = 30_000,
reconnectLimit,
enabled = true,
}: UseSSEOptions = {}): { status: SSEStatus } {
const [status, setStatus] = useState<SSEStatus>('connecting')
// Stable refs so the effect doesn't need to re-run when callbacks change
const onMessageRef = useRef(onMessage)
const onOpenRef = useRef(onOpen)
const onErrorRef = useRef(onError)
onMessageRef.current = onMessage
onOpenRef.current = onOpen
onErrorRef.current = onError
const reconnectAttemptRef = useRef(0)
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null)
const esRef = useRef<EventSource | null>(null)
const mountedRef = useRef(true)
const clearReconnectTimer = useCallback(() => {
if (reconnectTimerRef.current !== null) {
clearTimeout(reconnectTimerRef.current)
reconnectTimerRef.current = null
}
}, [])
const connect = useCallback(() => {
if (!mountedRef.current || !enabled) return
// Clean up any existing connection
if (esRef.current) {
esRef.current.close()
esRef.current = null
}
setStatus(reconnectAttemptRef.current === 0 ? 'connecting' : 'reconnecting')
const es = new EventSource(url)
esRef.current = es
es.onopen = () => {
if (!mountedRef.current) return
reconnectAttemptRef.current = 0
setStatus('connected')
onOpenRef.current?.()
}
es.onerror = (evt) => {
if (!mountedRef.current) return
// EventSource auto-retries but we manage our own to get back-off control
es.close()
esRef.current = null
onErrorRef.current?.(evt)
reconnectAttemptRef.current += 1
// Circuit-breaker: give up after reconnectLimit consecutive failures
if (reconnectLimit !== undefined && reconnectAttemptRef.current > reconnectLimit) {
setStatus('error')
return
}
// Exponential back-off: 1s, 2s, 4s … capped at reconnectMaxMs
// Note: attempt is 1-based here (already incremented), so we use attempt-1 for the exponent
const delay = Math.min(
reconnectBaseMs * 2 ** (reconnectAttemptRef.current - 1),
reconnectMaxMs,
)
setStatus('reconnecting')
clearReconnectTimer()
reconnectTimerRef.current = setTimeout(() => {
if (mountedRef.current) connect()
}, delay)
}
// Register listeners for all known event types
for (const eventType of SSE_EVENTS) {
es.addEventListener(eventType, (evt: MessageEvent) => {
if (!mountedRef.current) return
let data: unknown = evt.data
try {
data = JSON.parse(evt.data as string)
} catch {
// leave as raw string
}
onMessageRef.current?.({ type: eventType, data })
})
}
// Catch-all for unnamed events (type == 'message').
// Won't fire for the named events registered via addEventListener above.
es.onmessage = (evt: MessageEvent) => {
if (!mountedRef.current) return
let data: unknown = evt.data
try {
data = JSON.parse(evt.data as string)
} catch {
// leave as raw string
}
onMessageRef.current?.({ type: 'message', data })
}
}, [url, enabled, reconnectBaseMs, reconnectMaxMs, reconnectLimit, clearReconnectTimer])
useEffect(() => {
mountedRef.current = true
if (enabled) connect()
return () => {
mountedRef.current = false
clearReconnectTimer()
if (esRef.current) {
esRef.current.close()
esRef.current = null
}
}
}, [connect, enabled, clearReconnectTimer])
return { status }
}
+8 -1
View File
@@ -4,13 +4,16 @@ import { QueryClient, QueryClientProvider } from '@tanstack/react-query'
import { BrowserRouter } from 'react-router-dom'
import ErrorBoundary from './components/ErrorBoundary'
import { ThemeProvider } from './hooks/useTheme'
import { SSEProvider } from './contexts/SSEContext'
import './index.css'
import App from './App'
const queryClient = new QueryClient({
defaultOptions: {
queries: {
staleTime: 30_000,
// No polling — real-time updates come through SSE.
// staleTime is kept high; data is pushed, not pulled.
staleTime: 60_000,
refetchOnWindowFocus: false,
retry: 1,
},
@@ -22,9 +25,13 @@ createRoot(document.getElementById('root')!).render(
<ErrorBoundary>
<ThemeProvider>
<QueryClientProvider client={queryClient}>
{/* SSEProvider must live inside QueryClientProvider so it can call
useQueryClient() to invalidate caches on incoming events. */}
<SSEProvider>
<BrowserRouter>
<App />
</BrowserRouter>
</SSEProvider>
</QueryClientProvider>
</ThemeProvider>
</ErrorBoundary>
+44 -40
View File
@@ -1,18 +1,36 @@
import { useTheme } from '../hooks/useTheme'
import { useLocalStorage } from '../hooks/useLocalStorage'
import { Sun, Moon, Monitor, Zap, Clock } from 'lucide-react'
import { useSSEContext } from '../contexts/SSEContext'
import { Sun, Moon, Monitor, Zap, Radio } from 'lucide-react'
const REFRESH_PRESETS = [
{ label: '5s', value: 5_000 },
{ label: '10s', value: 10_000 },
{ label: '30s', value: 30_000 },
{ label: '60s', value: 60_000 },
]
const SSE_STATUS_COPY: Record<string, { label: string; description: string; color: string }> = {
connected: {
label: 'Connected',
description: 'Real-time updates are active. Agent status, tasks, and progress stream live.',
color: 'text-green-500',
},
connecting: {
label: 'Connecting…',
description: 'Establishing SSE connection to the backend.',
color: 'text-yellow-500',
},
reconnecting: {
label: 'Reconnecting…',
description: 'Connection lost. Retrying with exponential back-off.',
color: 'text-yellow-500',
},
error: {
label: 'Disconnected',
description: 'Could not connect to the SSE endpoint. Check that the backend is reachable.',
color: 'text-red-500',
},
}
export default function SettingsPage() {
const { isDark, toggleTheme } = useTheme()
const [gatewayUrl, setGatewayUrl] = useLocalStorage('cc-gateway-url', '')
const [refreshInterval, setRefreshInterval] = useLocalStorage('cc-refresh-interval', 30_000)
const { sseStatus } = useSSEContext()
const sseInfo = SSE_STATUS_COPY[sseStatus] ?? SSE_STATUS_COPY.error
return (
<div className="space-y-8 max-w-2xl">
@@ -80,45 +98,31 @@ export default function SettingsPage() {
</div>
</section>
{/* Refresh */}
{/* Real-time connection status */}
<section className="space-y-4">
<h2 className="text-lg font-semibold flex items-center gap-2">
<Clock size={20} className="text-primary" />
Auto Refresh
<Radio size={20} className="text-primary" />
Real-time Updates
</h2>
<div className="p-5 rounded-xl border border-surface-light bg-surface-dark space-y-3">
<p className="text-sm text-on-surface-variant">Data refresh interval for agent status and logs</p>
<div className="flex flex-col gap-2">
<input
type="range"
min="0"
max="3"
step="1"
value={REFRESH_PRESETS.findIndex((p) => p.value === refreshInterval)}
onChange={(e) => {
const idx = parseInt(e.target.value)
setRefreshInterval(REFRESH_PRESETS[idx].value)
}}
className="w-full accent-primary"
/>
<div className="flex justify-between text-xs text-on-surface-muted">
{REFRESH_PRESETS.map((p) => (
<button
key={p.label}
onClick={() => setRefreshInterval(p.value)}
className={`px-3 py-1 rounded-lg transition-colors ${
refreshInterval === p.value
? 'bg-primary/10 text-primary'
: 'hover:bg-surface-light'
}`}
>
{p.label}
</button>
))}
<div className="flex items-center justify-between">
<div>
<p className="font-medium">SSE Connection</p>
<p className="text-sm text-on-surface-variant mt-0.5">{sseInfo.description}</p>
</div>
<span className={`text-sm font-semibold whitespace-nowrap ${sseInfo.color}`}>
{sseInfo.label}
</span>
</div>
<p className="text-xs text-on-surface-muted">
Endpoint: <code className="bg-surface-light px-1.5 py-0.5 rounded text-on-surface-variant">/api/events</code>
&nbsp;·&nbsp;Events: agent.status, agent.task, agent.progress, fleet.update
</p>
<p className="text-xs text-on-surface-muted">
Polling is disabled. All status updates are pushed from the server over a persistent SSE connection.
The client reconnects automatically with exponential back-off on drop.
</p>
</div>
</section>
</div>
+72
View File
@@ -0,0 +1,72 @@
/**
* SSE event payload types matching the Go backend (internal/handler/sse.go).
*
* Event format on the wire:
* event: <eventType>
* data: <json>
*
* The types below define the backend contract. The SSEPayloadMap maps
* each event type string to its expected payload shape. SSEMessage is a
* discriminated union on `type` — when you switch on msg.type, TypeScript
* narrows msg.data to the correct payload interface automatically.
*/
import type { AgentStatus } from '../types'
/** agent.status — agent came online, went offline, changed state */
export interface AgentStatusEvent {
agentId: string
status: AgentStatus
/** Optional human-readable reason (e.g. error message) */
reason?: string
}
/** agent.task — a task was assigned to or completed by an agent */
export interface AgentTaskEvent {
agentId: string
taskId: string
title: string
action: 'assigned' | 'completed' | 'failed'
}
/** agent.progress — incremental progress update for a running task */
export interface AgentProgressEvent {
agentId: string
taskId: string
progress: number
/** Optional description of what is currently happening */
message?: string
}
/**
* fleet.update — bulk refresh of all agents (e.g. after a deployment).
* The backend may send partial or complete agent state.
*/
export interface FleetUpdateEvent {
/** ISO timestamp of when the snapshot was taken */
timestamp: string
/** Number of agents in the fleet */
agentCount: number
}
/** Union of all SSE data payloads keyed by event type. */
export type SSEPayloadMap = {
'agent.status': AgentStatusEvent
'agent.task': AgentTaskEvent
'agent.progress': AgentProgressEvent
'fleet.update': FleetUpdateEvent
connected: { clientCount: number }
message: unknown
}
/**
* Discriminated SSE message — the `type` field narrows `data` via SSEPayloadMap.
*
* Usage:
* if (msg.type === 'agent.status') {
* msg.data.agentId // ✅ TypeScript knows this is AgentStatusEvent
* }
*/
export type SSEMessage = {
[K in keyof SSEPayloadMap]: { type: K; data: SSEPayloadMap[K] }
}[keyof SSEPayloadMap]
+1
View File
@@ -0,0 +1 @@
import '@testing-library/jest-dom'
+1 -1
View File
@@ -1,4 +1,4 @@
export type AgentStatus = 'active' | 'idle' | 'thinking' | 'error'
export type AgentStatus = 'active' | 'idle' | 'thinking' | 'error' | 'offline'
export interface Agent {
id: string
+1 -1
View File
@@ -4,7 +4,7 @@
"target": "es2023",
"lib": ["ES2023", "DOM"],
"module": "esnext",
"types": ["vite/client"],
"types": ["vite/client", "vitest/globals"],
"skipLibCheck": true,
/* Bundler mode */
+1 -1
View File
@@ -20,5 +20,5 @@
"erasableSyntaxOnly": true,
"noFallthroughCasesInSwitch": true
},
"include": ["vite.config.ts"]
"include": ["vite.config.ts", "vitest.config.ts"]
}
+11
View File
@@ -0,0 +1,11 @@
import { defineConfig } from 'vitest/config'
import react from '@vitejs/plugin-react'
export default defineConfig({
plugins: [react()],
test: {
environment: 'jsdom',
globals: true,
setupFiles: ['./src/test-setup.ts'],
},
})
+11 -10
View File
@@ -63,29 +63,30 @@ func main() {
Broker: broker,
})
// ── Gateway: WS primary + REST fallback ────────────────────────────────
// WebSocket client (primary — real-time events via OpenClaw v3 protocol)
// ── Gateway clients (WS primary, REST fallback) ───────────────────
// WS gateway client (primary path)
wsClient := gateway.NewWSClient(gateway.WSConfig{
URL: cfg.WSGatewayURL,
AuthToken: cfg.WSGatewayToken,
}, agentRepo, broker, logger)
// REST polling client (fallback — only used if WS connection fails)
restClient := gateway.NewClient(gateway.Config{
URL: cfg.GatewayURL,
PollInterval: cfg.GatewayPollInterval,
// REST gateway client (fallback — only polls if WS fails to connect)
gwClient := gateway.NewClient(gateway.Config{
URL: cfg.GatewayRestURL,
PollInterval: cfg.GatewayRestPollInterval,
}, agentRepo, broker)
// Wire them: WS notifies REST to stand down on successful connect
wsClient.SetRESTClient(restClient)
// Wire them together: REST defers to WS when WS is connected
wsClient.SetRESTClient(gwClient)
gwClient.SetWSClient(wsClient)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start WS client first (primary)
go wsClient.Start(ctx)
// Start REST client (fallback polling)
go restClient.Start(ctx)
// Start REST client (will wait for WS, then stand down or fall back)
go gwClient.Start(ctx)
// ── Server ─────────────────────────────────────────────────────────────
srv := &http.Server{
+7 -7
View File
@@ -15,10 +15,10 @@ type Config struct {
CORSOrigin string
LogLevel string
Environment string
GatewayURL string // REST fallback URL
GatewayPollInterval time.Duration // REST fallback poll interval
WSGatewayURL string // WebSocket gateway URL
WSGatewayToken string // WebSocket auth token
GatewayRestURL string
GatewayRestPollInterval time.Duration
WSGatewayURL string
WSGatewayToken string
}
// Load reads configuration from environment variables, applying defaults where
@@ -30,9 +30,9 @@ func Load() *Config {
CORSOrigin: getEnv("CORS_ORIGIN", "*"),
LogLevel: getEnv("LOG_LEVEL", "info"),
Environment: getEnv("ENVIRONMENT", "development"),
GatewayURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"),
GatewayPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
WSGatewayURL: getEnv("GATEWAY_WS_URL", "ws://host.docker.internal:18789/"),
GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"),
GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"),
WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""),
}
}
+35 -12
View File
@@ -13,6 +13,7 @@ import (
"fmt"
"log/slog"
"net/http"
"sync"
"time"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
@@ -22,15 +23,17 @@ import (
// Client polls the OpenClaw gateway for agent status and keeps the database
// and SSE broker in sync. When a WSClient is set, the REST poller becomes a
// fallback that only activates if the WS connection fails.
// fallback: it waits for the WS client to signal readiness, and only starts
// polling if WS fails to connect after initial backoff retries.
type Client struct {
url string
pollInterval time.Duration
httpClient *http.Client
agents repository.AgentRepo
broker *handler.Broker
wsClient *Client // optional WS client; when set, REST is fallback only
wsClient *WSClient // optional WS client; when set, REST is fallback only
wsReady chan struct{} // closed once WS connection is established
wsReadyOnce sync.Once // protects wsReady close from double-close race
}
// Config holds gateway client configuration, typically loaded from environment.
@@ -63,36 +66,56 @@ func NewClient(cfg Config, agents repository.AgentRepo, broker *handler.Broker)
// to it. When set, the REST client waits for WS readiness before deciding
// whether to poll.
func (c *Client) SetWSClient(ws *WSClient) {
_ = ws // stored for future reconnection coordination
c.wsClient = ws
}
// MarkWSReady signals that the WS connection is live and the REST poller
// should stand down. Called by WSClient after a successful handshake.
func (c *Client) MarkWSReady() {
select {
case <-c.wsReady:
// already closed
default:
c.wsReadyOnce.Do(func() {
close(c.wsReady)
}
})
}
// Start begins the gateway client loop. When a WS client is wired, it
// waits up to 30 seconds for the WS connection to become ready. If WS
// connects, the REST poller stands down. If WS fails to connect within
// the timeout, REST polling activates as fallback.
// connects, the REST poller stands down and only logs periodically. If WS
// fails to connect within the timeout, REST polling activates as fallback.
func (c *Client) Start(ctx context.Context) {
slog.Info("gateway client starting",
if c.wsClient != nil {
slog.Info("gateway client waiting for WS connection", "timeout", "30s")
select {
case <-c.wsReady:
slog.Info("gateway client using WS — REST poller standing down")
// WS is live; keep this goroutine alive but idle. If WS
// disconnects later, we could re-enter polling, but for now
// the WS client handles its own reconnection.
<-ctx.Done()
slog.Info("gateway client stopped (WS mode)")
return
case <-time.After(30 * time.Second):
slog.Warn("gateway client: WS not ready after 30s — falling back to REST polling",
"url", c.url,
"pollInterval", c.pollInterval.String())
case <-ctx.Done():
slog.Info("gateway client stopped while waiting for WS")
return
}
} else {
slog.Info("gateway client using REST polling (no WS client configured)",
"url", c.url,
"pollInterval", c.pollInterval.String())
}
// REST fallback polling
ticker := time.NewTicker(c.pollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
slog.Info("gateway client stopped")
slog.Info("gateway client stopped (REST fallback)")
return
case <-ticker.C:
c.poll(ctx)
+76 -32
View File
@@ -51,8 +51,18 @@ type agentConfigPayload struct {
// ── Handler registration ─────────────────────────────────────────────────
// registerEventHandlers sets up all live event handlers on the WSClient.
// Called once after a successful handshake + initial sync.
// Call this once after a successful handshake + initial sync.
func (c *WSClient) registerEventHandlers() {
if c.agents == nil || c.broker == nil {
c.logger.Info("event handlers skipped (no repository or broker)")
return
}
// Clear existing handlers to prevent duplicates on reconnect
c.mu.Lock()
c.handlers = make(map[string][]eventHandler)
c.mu.Unlock()
c.OnEvent("sessions.changed", c.handleSessionsChanged)
c.OnEvent("presence", c.handlePresence)
c.OnEvent("agent.config", c.handleAgentConfig)
@@ -68,11 +78,14 @@ func (c *WSClient) registerEventHandlers() {
// For each changed session: map the gateway status to an AgentStatus, update
// the agent in the DB, then broadcast via SSE.
func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
c.logger.Debug("handleSessionsChanged", "payload", string(payload))
c.logger.Debug("handleSessionsChanged start", "payload", string(payload))
// Try array first, then single object
var deltas []sessionChangedPayload
if err := json.Unmarshal(payload, &deltas); err != nil || len(deltas) == 0 {
if err := json.Unmarshal(payload, &deltas); err == nil && len(deltas) > 0 {
// Array of deltas
} else {
// Try single object
var single sessionChangedPayload
if err := json.Unmarshal(payload, &single); err != nil {
c.logger.Warn("sessions.changed: unparseable payload, skipping", "error", err)
@@ -92,27 +105,43 @@ func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
agentStatus := mapSessionStatus(d.Status)
// Build partial update
update := models.UpdateAgentRequest{
Status: &agentStatus,
}
// Session key
if d.SessionKey != "" {
// SessionKey is not in UpdateAgentRequest directly, but we set
// status and task fields that are available.
}
// Current task
if d.CurrentTask != "" {
update.CurrentTask = &d.CurrentTask
}
// Task progress
if d.TaskProgress != nil {
update.TaskProgress = d.TaskProgress
} else if d.TotalTokens > 0 {
// Derive progress from token count as fallback
prog := min(d.TotalTokens/100, 100)
update.TaskProgress = &prog
}
// Task elapsed
if d.TaskElapsed != "" {
update.TaskElapsed = &d.TaskElapsed
}
// Error message
if d.ErrorMessage != "" {
update.ErrorMessage = &d.ErrorMessage
}
// If session ended, clear task and progress
// If session ended (done or empty status), set agent to idle and
// clear the current task
if agentStatus == models.AgentStatusIdle {
emptyTask := ""
update.CurrentTask = &emptyTask
@@ -120,7 +149,7 @@ func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
update.TaskProgress = &zeroProg
}
// DB update first
// Update DB first
updated, err := c.agents.Update(ctx, d.AgentID, update)
if err != nil {
c.logger.Warn("sessions.changed: DB update failed",
@@ -128,23 +157,27 @@ func (c *WSClient) handleSessionsChanged(payload json.RawMessage) {
continue
}
// Then SSE broadcast
// Then broadcast
c.broker.Broadcast("agent.status", updated)
if d.TaskProgress != nil || d.CurrentTask != "" {
c.broker.Broadcast("agent.progress", updated)
}
c.logger.Debug("sessions.changed: agent updated",
"agentId", d.AgentID, "status", string(agentStatus))
"agentId", d.AgentID,
"status", string(agentStatus))
}
c.logger.Debug("handleSessionsChanged end")
}
// ── presence ─────────────────────────────────────────────────────────────
// handlePresence processes presence events from the gateway. Updates the
// agent's lastActivity and broadcasts status if the connection state changed.
// agent's lastActivity timestamp and broadcasts status if the connection
// state changed.
func (c *WSClient) handlePresence(payload json.RawMessage) {
c.logger.Debug("handlePresence", "payload", string(payload))
c.logger.Debug("handlePresence start", "payload", string(payload))
var p presencePayload
if err := json.Unmarshal(payload, &p); err != nil {
@@ -153,21 +186,31 @@ func (c *WSClient) handlePresence(payload json.RawMessage) {
}
if p.AgentID == "" {
c.logger.Debug("presence: skipping event with empty agentId")
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// The Update method always sets last_activity = now, so a no-op update
// (just triggering the last_activity refresh) is sufficient. We send
// an empty-ish update — the repo always bumps last_activity.
// If connection state is reported, also update status.
update := models.UpdateAgentRequest{}
// If device disconnected, set agent to idle
if p.Connected != nil && !*p.Connected {
// Device disconnected — set agent to idle
idle := models.AgentStatusIdle
update.Status = &idle
}
// DB update first (Update always bumps last_activity)
// Pass lastActivityAt from the event so DB and SSE stay consistent
if p.LastActivityAt != "" {
update.LastActivityAt = &p.LastActivityAt
}
// Update DB first
updated, err := c.agents.Update(ctx, p.AgentID, update)
if err != nil {
c.logger.Warn("presence: DB update failed",
@@ -175,24 +218,21 @@ func (c *WSClient) handlePresence(payload json.RawMessage) {
return
}
if p.LastActivityAt != "" {
updated.LastActivity = p.LastActivityAt
}
// Then SSE broadcast
// Then broadcast
c.broker.Broadcast("agent.status", updated)
c.logger.Debug("presence: agent updated",
"agentId", p.AgentID, "connected", p.Connected)
"agentId", p.AgentID,
"connected", p.Connected)
}
// ── agent.config ─────────────────────────────────────────────────────────
// handleAgentConfig processes agent.config events from the gateway. Updates
// agent metadata (channel) in the DB and broadcasts a fleet.update with the
// full fleet snapshot.
// agent metadata (name, channel) in the DB and broadcasts a fleet.update
// with the full fleet snapshot.
func (c *WSClient) handleAgentConfig(payload json.RawMessage) {
c.logger.Debug("handleAgentConfig", "payload", string(payload))
c.logger.Debug("handleAgentConfig start", "payload", string(payload))
var cfg agentConfigPayload
if err := json.Unmarshal(payload, &cfg); err != nil {
@@ -201,19 +241,27 @@ func (c *WSClient) handleAgentConfig(payload json.RawMessage) {
}
if cfg.ID == "" {
c.logger.Debug("agent.config: skipping event with empty id")
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Build partial update with available fields.
update := models.UpdateAgentRequest{}
if cfg.Name != "" {
update.DisplayName = &cfg.Name
}
if cfg.Role != "" {
update.Role = &cfg.Role
}
if cfg.Channel != "" {
update.Channel = &cfg.Channel
}
// DB update first
// Update DB first
updated, err := c.agents.Update(ctx, cfg.ID, update)
if err != nil {
c.logger.Warn("agent.config: DB update failed",
@@ -221,23 +269,19 @@ func (c *WSClient) handleAgentConfig(payload json.RawMessage) {
return
}
// Apply display name/role from config event
if cfg.Name != "" {
updated.DisplayName = cfg.Name
}
if cfg.Role != "" {
updated.Role = cfg.Role
}
// Broadcast full fleet snapshot so frontend gets updated agent info
// Then broadcast fleet snapshot
allAgents, err := c.agents.List(ctx, "")
if err != nil {
c.logger.Warn("agent.config: fleet list failed, broadcasting single agent", "error", err)
c.logger.Warn("agent.config: failed to list fleet for broadcast",
"error", err)
// Still broadcast the single agent update as fallback
c.broker.Broadcast("agent.status", updated)
return
}
c.broker.Broadcast("fleet.update", allAgents)
c.logger.Debug("agent.config: fleet updated", "agentId", cfg.ID, "name", cfg.Name)
c.logger.Debug("agent.config: fleet updated",
"agentId", cfg.ID,
"name", cfg.Name)
}
+516
View File
@@ -0,0 +1,516 @@
package gateway
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"sync"
"testing"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
)
// ── Mock AgentRepo ────────────────────────────────────────────────────────
type mockAgentRepo struct {
mu sync.Mutex
agents map[string]models.AgentCardData
updateCalls []updateCall
}
type updateCall struct {
id string
req models.UpdateAgentRequest
}
func (m *mockAgentRepo) Get(_ context.Context, id string) (models.AgentCardData, error) {
m.mu.Lock()
defer m.mu.Unlock()
a, ok := m.agents[id]
if !ok {
return models.AgentCardData{}, errNotFound
}
return a, nil
}
func (m *mockAgentRepo) Update(_ context.Context, id string, req models.UpdateAgentRequest) (models.AgentCardData, error) {
m.mu.Lock()
defer m.mu.Unlock()
a, ok := m.agents[id]
if !ok {
return models.AgentCardData{}, errNotFound
}
if req.Status != nil {
a.Status = *req.Status
}
if req.DisplayName != nil {
a.DisplayName = *req.DisplayName
}
if req.Role != nil {
a.Role = *req.Role
}
if req.Channel != nil {
a.Channel = *req.Channel
}
if req.CurrentTask != nil {
a.CurrentTask = req.CurrentTask
}
if req.TaskProgress != nil {
a.TaskProgress = req.TaskProgress
}
if req.TaskElapsed != nil {
a.TaskElapsed = req.TaskElapsed
}
if req.ErrorMessage != nil {
a.ErrorMessage = req.ErrorMessage
}
if req.LastActivityAt != nil {
a.LastActivity = *req.LastActivityAt
}
m.agents[id] = a
m.updateCalls = append(m.updateCalls, updateCall{id, req})
return a, nil
}
func (m *mockAgentRepo) Create(_ context.Context, a models.AgentCardData) error {
m.mu.Lock()
defer m.mu.Unlock()
m.agents[a.ID] = a
return nil
}
func (m *mockAgentRepo) List(_ context.Context, statusFilter models.AgentStatus) ([]models.AgentCardData, error) {
m.mu.Lock()
defer m.mu.Unlock()
var result []models.AgentCardData
for _, a := range m.agents {
if statusFilter == "" || a.Status == statusFilter {
result = append(result, a)
}
}
return result, nil
}
func (m *mockAgentRepo) Delete(_ context.Context, id string) error {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.agents, id)
return nil
}
func (m *mockAgentRepo) Count(_ context.Context) (int, error) {
m.mu.Lock()
defer m.mu.Unlock()
return len(m.agents), nil
}
// errNotFound is returned by the mock repo when an agent is not found.
var errNotFound = fmt.Errorf("not found")
// ── Broadcast capture helper ───────────────────────────────────────────────
// broadcastCapture wraps a real Broker and captures all broadcasts
// via a subscribed channel. Use captured() to retrieve events that have
// been received so far. Call close() to unsubscribe when done.
type broadcastCapture struct {
broker *handler.Broker
ch chan handler.SSEEvent
}
func newBroadcastCapture(broker *handler.Broker) *broadcastCapture {
return &broadcastCapture{
broker: broker,
ch: broker.Subscribe(),
}
}
// captured drains all pending events from the subscription channel
// and returns them. This is synchronous — it only returns events that
// have already been sent to the channel.
func (bc *broadcastCapture) captured() []handler.SSEEvent {
var events []handler.SSEEvent
for {
select {
case evt := <-bc.ch:
events = append(events, evt)
default:
return events
}
}
}
func (bc *broadcastCapture) close() {
bc.broker.Unsubscribe(bc.ch)
}
// ── Test helpers ──────────────────────────────────────────────────────────
// newTestWSClient creates a WSClient wired to a mock repo and a real broker.
// Returns the client, the mock repo, and a broadcast capture.
func newTestWSClient() (*WSClient, *mockAgentRepo, *handler.Broker, *broadcastCapture) {
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
client := NewWSClient(WSConfig{}, repo, broker, slog.Default())
return client, repo, broker, capture
}
// ── Tests ─────────────────────────────────────────────────────────────────
func TestHandleSessionsChanged_Active(t *testing.T) {
client, repo, _, capture := newTestWSClient()
defer capture.close()
repo.agents["otto"] = models.AgentCardData{
ID: "otto",
DisplayName: "Otto",
Status: models.AgentStatusIdle,
}
payload := json.RawMessage(`{
"sessionKey": "s1",
"agentId": "otto",
"status": "running",
"totalTokens": 500,
"currentTask": "Orchestrating tasks"
}`)
client.handleSessionsChanged(payload)
// Verify: agent status updated to active
repo.mu.Lock()
agent := repo.agents["otto"]
calls := make([]updateCall, len(repo.updateCalls))
copy(calls, repo.updateCalls)
repo.mu.Unlock()
if agent.Status != models.AgentStatusActive {
t.Errorf("agent status = %q, want %q", agent.Status, models.AgentStatusActive)
}
// Verify: update was called
if len(calls) == 0 {
t.Fatal("expected at least one update call")
}
if calls[0].id != "otto" {
t.Errorf("update call agentId = %q, want %q", calls[0].id, "otto")
}
// Verify: broker broadcast "agent.status"
events := capture.captured()
found := false
for _, evt := range events {
if evt.EventType == "agent.status" {
found = true
break
}
}
if !found {
t.Error("expected broker broadcast with event type 'agent.status'")
}
}
func TestHandleSessionsChanged_Idle(t *testing.T) {
client, repo, _, capture := newTestWSClient()
defer capture.close()
repo.agents["dex"] = models.AgentCardData{
ID: "dex",
DisplayName: "Dex",
Status: models.AgentStatusActive,
CurrentTask: strPtr("Writing API"),
}
payload := json.RawMessage(`{
"sessionKey": "s2",
"agentId": "dex",
"status": "done",
"totalTokens": 1000
}`)
client.handleSessionsChanged(payload)
repo.mu.Lock()
agent := repo.agents["dex"]
repo.mu.Unlock()
// Verify: agent goes idle
if agent.Status != models.AgentStatusIdle {
t.Errorf("agent status = %q, want %q", agent.Status, models.AgentStatusIdle)
}
// Verify: current task cleared (set to empty string)
if agent.CurrentTask != nil && *agent.CurrentTask != "" {
t.Errorf("current task = %q, want empty (cleared on idle)", *agent.CurrentTask)
}
// Verify: broker fires "agent.status"
events := capture.captured()
found := false
for _, evt := range events {
if evt.EventType == "agent.status" {
found = true
break
}
}
if !found {
t.Error("expected broker broadcast with event type 'agent.status'")
}
}
func TestHandleSessionsChanged_ArrayPayload(t *testing.T) {
client, repo, _, capture := newTestWSClient()
defer capture.close()
repo.agents["otto"] = models.AgentCardData{ID: "otto", DisplayName: "Otto", Status: models.AgentStatusIdle}
repo.agents["dex"] = models.AgentCardData{ID: "dex", DisplayName: "Dex", Status: models.AgentStatusIdle}
payload := json.RawMessage(`[
{"sessionKey":"s1","agentId":"otto","status":"running","totalTokens":100},
{"sessionKey":"s2","agentId":"dex","status":"streaming","totalTokens":200}
]`)
client.handleSessionsChanged(payload)
repo.mu.Lock()
otto := repo.agents["otto"]
dex := repo.agents["dex"]
repo.mu.Unlock()
if otto.Status != models.AgentStatusActive {
t.Errorf("otto status = %q, want active", otto.Status)
}
if dex.Status != models.AgentStatusActive {
t.Errorf("dex status = %q, want active", dex.Status)
}
// Both should produce broadcasts
events := capture.captured()
statusCount := 0
for _, evt := range events {
if evt.EventType == "agent.status" {
statusCount++
}
}
if statusCount < 2 {
t.Errorf("expected at least 2 agent.status broadcasts, got %d", statusCount)
}
}
func TestHandleSessionsChanged_SkipsEmptyAgentID(t *testing.T) {
client, _, _, capture := newTestWSClient()
defer capture.close()
payload := json.RawMessage(`{"sessionKey":"s1","agentId":"","status":"running"}`)
client.handleSessionsChanged(payload)
events := capture.captured()
if len(events) > 0 {
t.Errorf("expected no broadcasts for empty agentId, got %d", len(events))
}
}
func TestHandleSessionsChanged_UnparseablePayload(t *testing.T) {
client, _, _, capture := newTestWSClient()
defer capture.close()
payload := json.RawMessage(`not json at all`)
client.handleSessionsChanged(payload)
events := capture.captured()
if len(events) > 0 {
t.Errorf("expected no broadcasts for unparseable payload, got %d", len(events))
}
}
func TestHandlePresence(t *testing.T) {
client, repo, _, capture := newTestWSClient()
defer capture.close()
repo.agents["pip"] = models.AgentCardData{
ID: "pip",
DisplayName: "Pip",
Status: models.AgentStatusActive,
}
payload := json.RawMessage(`{
"agentId": "pip",
"connected": true,
"lastActivityAt": "2025-01-01T00:00:00Z"
}`)
client.handlePresence(payload)
repo.mu.Lock()
agent := repo.agents["pip"]
calls := make([]updateCall, len(repo.updateCalls))
copy(calls, repo.updateCalls)
repo.mu.Unlock()
// Agent should still be active (connected=true doesn't change status)
if agent.Status != models.AgentStatusActive {
t.Errorf("agent status = %q, want active", agent.Status)
}
// Update should have been called (for lastActivityAt)
if len(calls) == 0 {
t.Fatal("expected at least one update call")
}
// Verify broadcast
events := capture.captured()
found := false
for _, evt := range events {
if evt.EventType == "agent.status" {
found = true
break
}
}
if !found {
t.Error("expected broker broadcast with event type 'agent.status'")
}
}
func TestHandlePresence_Disconnect(t *testing.T) {
client, repo, _, capture := newTestWSClient()
defer capture.close()
repo.agents["pip"] = models.AgentCardData{
ID: "pip",
DisplayName: "Pip",
Status: models.AgentStatusActive,
}
payload := json.RawMessage(`{
"agentId": "pip",
"connected": false
}`)
client.handlePresence(payload)
repo.mu.Lock()
agent := repo.agents["pip"]
repo.mu.Unlock()
// Agent should go idle on disconnect
if agent.Status != models.AgentStatusIdle {
t.Errorf("agent status = %q, want idle after disconnect", agent.Status)
}
events := capture.captured()
found := false
for _, evt := range events {
if evt.EventType == "agent.status" {
found = true
break
}
}
if !found {
t.Error("expected broker broadcast with event type 'agent.status' on disconnect")
}
}
func TestHandlePresence_EmptyAgentID(t *testing.T) {
client, _, _, capture := newTestWSClient()
defer capture.close()
payload := json.RawMessage(`{"agentId":"","connected":true}`)
client.handlePresence(payload)
events := capture.captured()
if len(events) > 0 {
t.Errorf("expected no broadcasts for empty agentId, got %d", len(events))
}
}
func TestHandleAgentConfig(t *testing.T) {
client, repo, _, capture := newTestWSClient()
defer capture.close()
repo.agents["rex"] = models.AgentCardData{
ID: "rex",
DisplayName: "Rex",
Role: "Frontend Dev",
Status: models.AgentStatusIdle,
Channel: "discord",
}
payload := json.RawMessage(`{
"id": "rex",
"name": "Rex the Dev",
"role": "Senior Frontend",
"channel": "telegram"
}`)
client.handleAgentConfig(payload)
repo.mu.Lock()
agent := repo.agents["rex"]
calls := make([]updateCall, len(repo.updateCalls))
copy(calls, repo.updateCalls)
repo.mu.Unlock()
// Verify DisplayName and Role updated
if agent.DisplayName != "Rex the Dev" {
t.Errorf("displayName = %q, want %q", agent.DisplayName, "Rex the Dev")
}
if agent.Role != "Senior Frontend" {
t.Errorf("role = %q, want %q", agent.Role, "Senior Frontend")
}
if agent.Channel != "telegram" {
t.Errorf("channel = %q, want %q", agent.Channel, "telegram")
}
// Verify update was called
if len(calls) == 0 {
t.Fatal("expected at least one update call")
}
// Verify broker fires "fleet.update"
events := capture.captured()
found := false
for _, evt := range events {
if evt.EventType == "fleet.update" {
found = true
break
}
}
if !found {
t.Error("expected broker broadcast with event type 'fleet.update'")
}
}
func TestHandleAgentConfig_EmptyID(t *testing.T) {
client, _, _, capture := newTestWSClient()
defer capture.close()
payload := json.RawMessage(`{"id":"","name":"Ghost"}`)
client.handleAgentConfig(payload)
events := capture.captured()
if len(events) > 0 {
t.Errorf("expected no broadcasts for empty id, got %d", len(events))
}
}
func TestHandleAgentConfig_NotFound(t *testing.T) {
client, _, _, capture := newTestWSClient()
defer capture.close()
payload := json.RawMessage(`{"id":"unknown","name":"Ghost","role":"Phantom"}`)
client.handleAgentConfig(payload)
// Agent doesn't exist in repo, so Update will fail → handler logs warning, returns early
events := capture.captured()
for _, evt := range events {
if evt.EventType == "fleet.update" {
t.Error("fleet.update should not be broadcast when agent update fails")
}
}
}
+23 -14
View File
@@ -16,6 +16,8 @@ import (
// ── RPC response types ───────────────────────────────────────────────────
// agentListItem represents a single agent returned by the agents.list RPC.
// Fields are extracted gracefully from json.RawMessage so unknown fields
// from the gateway are silently ignored.
type agentListItem struct {
ID string `json:"id"`
Name string `json:"name"`
@@ -40,9 +42,14 @@ type sessionListItem struct {
// persists them, merges session state into agent cards, and broadcasts
// the merged fleet as a fleet.update event.
func (c *WSClient) initialSync(ctx context.Context) error {
if c.agents == nil {
c.logger.Info("initial sync skipped (no repository)")
return nil
}
c.logger.Info("initial sync starting")
// 1. Fetch agents via RPC
// 1. Fetch agents
agentsRaw, err := c.Send("agents.list", nil)
if err != nil {
return fmt.Errorf("agents.list RPC: %w", err)
@@ -55,7 +62,7 @@ func (c *WSClient) initialSync(ctx context.Context) error {
c.logger.Info("agents.list received", "count", len(agentItems))
// 2. Persist each agent (create if not exists, update if changed)
// 2. Persist each agent
for _, item := range agentItems {
card := agentItemToCard(item)
@@ -70,12 +77,13 @@ func (c *WSClient) initialSync(ctx context.Context) error {
continue
}
// Agent exists — update display name or role if changed
// Agent exists — update if display name or role changed
if existing.DisplayName != card.DisplayName || existing.Role != card.Role {
// Update what we can via UpdateAgentRequest
channel := card.Channel
newName := card.DisplayName
newRole := card.Role
_, updateErr := c.agents.Update(ctx, card.ID, models.UpdateAgentRequest{
Channel: &channel,
DisplayName: &newName,
Role: &newRole,
})
if updateErr != nil {
c.logger.Warn("sync: agent update failed", "id", card.ID, "error", updateErr)
@@ -83,7 +91,7 @@ func (c *WSClient) initialSync(ctx context.Context) error {
}
}
// 3. Fetch sessions via RPC
// 3. Fetch sessions
sessionsRaw, err := c.Send("sessions.list", nil)
if err != nil {
return fmt.Errorf("sessions.list RPC: %w", err)
@@ -96,7 +104,7 @@ func (c *WSClient) initialSync(ctx context.Context) error {
c.logger.Info("sessions.list received", "count", len(sessionItems))
// 4. Build agentId → session map for merge
// 4. Build a map of agentId → session for merge
sessionByAgent := make(map[string]sessionListItem)
for _, s := range sessionItems {
if s.AgentID != "" {
@@ -104,25 +112,26 @@ func (c *WSClient) initialSync(ctx context.Context) error {
}
}
// 5. Merge session state into agents, update DB, and collect for broadcast
// 5. Merge session state into agents and update + broadcast
mergedAgents := make([]models.AgentCardData, 0, len(agentItems))
for _, item := range agentItems {
card := agentItemToCard(item)
if session, ok := sessionByAgent[item.ID]; ok {
// Merge session state into agent card
// Merge session state
card.SessionKey = session.SessionKey
card.Status = mapSessionStatus(session.Status)
card.LastActivity = session.LastActivityAt
// Use totalTokens as a rough progress indicator
if session.TotalTokens > 0 {
prog := min(session.TotalTokens/100, 100)
prog := min(session.TotalTokens/100, 100) // normalize to 0-100
card.TaskProgress = &prog
}
}
// Persist merged status change
// Persist merged state
existing, err := c.agents.Get(ctx, card.ID)
if err == nil && existing.Status != card.Status {
status := card.Status
@@ -168,7 +177,7 @@ func agentItemToCard(item agentListItem) models.AgentCardData {
}
channel := item.Channel
if channel == "" {
channel = "discord"
channel = "unknown"
}
name := item.Name
if name == "" {
@@ -179,7 +188,7 @@ func agentItemToCard(item agentListItem) models.AgentCardData {
ID: item.ID,
DisplayName: name,
Role: role,
Status: models.AgentStatusIdle, // default; overridden by session merge
Status: models.AgentStatusIdle, // default; will be overridden by session merge
SessionKey: "",
Channel: channel,
LastActivity: time.Now().UTC().Format(time.RFC3339),
+236
View File
@@ -0,0 +1,236 @@
package gateway
import (
"context"
"testing"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
)
func TestInitialSync(t *testing.T) {
_ = &mockAgentRepo{agents: make(map[string]models.AgentCardData)} // verify mock compiles
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
defer capture.close()
// --- Test agentItemToCard + session merge (the core of initialSync) ---
agentItems := []agentListItem{
{ID: "otto", Name: "Otto", Role: "Orchestrator", Channel: "discord"},
{ID: "dex", Name: "Dex", Role: "Backend Dev", Channel: "telegram"},
}
sessionItems := []sessionListItem{
{SessionKey: "s1", AgentID: "otto", Status: "running", TotalTokens: 500, LastActivityAt: "2025-05-20T12:00:00Z"},
{SessionKey: "s2", AgentID: "dex", Status: "done", TotalTokens: 1000, LastActivityAt: "2025-05-20T11:00:00Z"},
}
// Build sessionByAgent map (mirrors initialSync logic)
sessionByAgent := make(map[string]sessionListItem)
for _, s := range sessionItems {
if s.AgentID != "" {
sessionByAgent[s.AgentID] = s
}
}
// Merge and verify
merged := make([]models.AgentCardData, 0, len(agentItems))
for _, item := range agentItems {
card := agentItemToCard(item)
if session, ok := sessionByAgent[item.ID]; ok {
card.SessionKey = session.SessionKey
card.Status = mapSessionStatus(session.Status)
card.LastActivity = session.LastActivityAt
if session.TotalTokens > 0 {
prog := min(session.TotalTokens/100, 100)
card.TaskProgress = &prog
}
}
merged = append(merged, card)
}
// Verify otto: running → active
if merged[0].ID != "otto" {
t.Errorf("merged[0].ID = %q, want %q", merged[0].ID, "otto")
}
if merged[0].Status != models.AgentStatusActive {
t.Errorf("otto status = %q, want %q (running → active)", merged[0].Status, models.AgentStatusActive)
}
if merged[0].SessionKey != "s1" {
t.Errorf("otto sessionKey = %q, want %q", merged[0].SessionKey, "s1")
}
if merged[0].TaskProgress == nil || *merged[0].TaskProgress != 5 {
t.Errorf("otto taskProgress = %v, want 5", merged[0].TaskProgress)
}
// Verify dex: done → idle
if merged[1].ID != "dex" {
t.Errorf("merged[1].ID = %q, want %q", merged[1].ID, "dex")
}
if merged[1].Status != models.AgentStatusIdle {
t.Errorf("dex status = %q, want %q (done → idle)", merged[1].Status, models.AgentStatusIdle)
}
if merged[1].SessionKey != "s2" {
t.Errorf("dex sessionKey = %q, want %q", merged[1].SessionKey, "s2")
}
}
func TestInitialSync_PersistCreatesNew(t *testing.T) {
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
defer capture.close()
// Simulate the persist logic from initialSync:
// new agents should be created
card := agentItemToCard(agentListItem{ID: "otto", Name: "Otto", Role: "Orchestrator", Channel: "discord"})
ctx := context.Background()
// Agent doesn't exist → create
_, err := repo.Get(ctx, card.ID)
if err == nil {
t.Fatal("expected agent to not exist yet")
}
if err := repo.Create(ctx, card); err != nil {
t.Fatalf("Create failed: %v", err)
}
got, err := repo.Get(ctx, card.ID)
if err != nil {
t.Fatalf("Get after Create failed: %v", err)
}
if got.ID != "otto" {
t.Errorf("got.ID = %q, want %q", got.ID, "otto")
}
if got.DisplayName != "Otto" {
t.Errorf("got.DisplayName = %q, want %q", got.DisplayName, "Otto")
}
if got.Role != "Orchestrator" {
t.Errorf("got.Role = %q, want %q", got.Role, "Orchestrator")
}
}
func TestInitialSync_PersistUpdatesExisting(t *testing.T) {
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
defer capture.close()
ctx := context.Background()
// Pre-populate with existing agent
repo.agents["otto"] = models.AgentCardData{
ID: "otto",
DisplayName: "Otto",
Role: "Old Role",
Status: models.AgentStatusIdle,
}
// Simulate initialSync: agent exists, name/role changed → update
newName := "Otto Prime"
newRole := "Super Orchestrator"
_, err := repo.Update(ctx, "otto", models.UpdateAgentRequest{
DisplayName: &newName,
Role: &newRole,
})
if err != nil {
t.Fatalf("Update failed: %v", err)
}
got, err := repo.Get(ctx, "otto")
if err != nil {
t.Fatalf("Get after Update failed: %v", err)
}
if got.DisplayName != "Otto Prime" {
t.Errorf("displayName = %q, want %q", got.DisplayName, "Otto Prime")
}
if got.Role != "Super Orchestrator" {
t.Errorf("role = %q, want %q", got.Role, "Super Orchestrator")
}
}
func TestInitialSync_MergesSessionStatus(t *testing.T) {
// When initialSync merges session state, an agent whose existing status
// differs from the session-derived status should be updated.
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
ctx := context.Background()
repo.agents["otto"] = models.AgentCardData{
ID: "otto",
DisplayName: "Otto",
Role: "Orchestrator",
Status: models.AgentStatusIdle,
}
// Simulate session merge: session says "running" → agent should go active
activeStatus := mapSessionStatus("running")
if activeStatus != models.AgentStatusActive {
t.Fatalf("mapSessionStatus(running) = %q, want active", activeStatus)
}
_, err := repo.Update(ctx, "otto", models.UpdateAgentRequest{
Status: &activeStatus,
})
if err != nil {
t.Fatalf("Update failed: %v", err)
}
got, err := repo.Get(ctx, "otto")
if err != nil {
t.Fatalf("Get failed: %v", err)
}
if got.Status != models.AgentStatusActive {
t.Errorf("status after merge = %q, want %q", got.Status, models.AgentStatusActive)
}
}
func TestInitialSync_BroadcastsFleet(t *testing.T) {
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
defer capture.close()
// Create some agents in the repo
repo.agents["otto"] = models.AgentCardData{ID: "otto", DisplayName: "Otto", Status: models.AgentStatusActive}
repo.agents["dex"] = models.AgentCardData{ID: "dex", DisplayName: "Dex", Status: models.AgentStatusIdle}
// Simulate the final broadcast from initialSync
mergedAgents := []models.AgentCardData{
repo.agents["otto"],
repo.agents["dex"],
}
broker.Broadcast("fleet.update", mergedAgents)
events := capture.captured()
if len(events) == 0 {
t.Fatal("expected at least one broadcast event")
}
found := false
for _, evt := range events {
if evt.EventType == "fleet.update" {
found = true
// Verify data is the merged agents list
agents, ok := evt.Data.([]models.AgentCardData)
if !ok {
t.Fatalf("fleet.update data type = %T, want []models.AgentCardData", evt.Data)
}
if len(agents) != 2 {
t.Errorf("fleet.update agents count = %d, want 2", len(agents))
}
break
}
}
if !found {
t.Error("expected fleet.update broadcast event")
}
}
+87 -50
View File
@@ -1,7 +1,7 @@
// Package gateway provides WebSocket client integration with the OpenClaw
// gateway using WS protocol v3. The WSClient handles connection, handshake,
// frame routing, request/response correlation, and automatic reconnection
// with exponential backoff (1s → 30s max).
// with exponential backoff.
package gateway
import (
@@ -15,8 +15,8 @@ import (
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/repository"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/google/uuid"
)
// WSConfig holds WebSocket client configuration, typically loaded from
@@ -41,7 +41,7 @@ type eventHandler func(json.RawMessage)
// WSClient connects to the OpenClaw gateway over WebSocket, completes the
// v3 handshake, routes incoming frames, and automatically reconnects on
// disconnect with exponential backoff (1s → 30s max).
// disconnect with exponential backoff.
type WSClient struct {
config WSConfig
conn *websocket.Conn
@@ -51,9 +51,11 @@ type WSClient struct {
agents repository.AgentRepo
broker *handler.Broker
logger *slog.Logger
handlers map[string][]eventHandler
connID string // set after successful hello-ok
connId string // set after successful hello-ok
restClient *Client // optional REST client to notify on WS ready
wsReadyOnce sync.Once // ensures MarkWSReady close is one-shot
}
// NewWSClient returns a WSClient wired to the given repository and broker.
@@ -79,7 +81,7 @@ func (c *WSClient) SetRESTClient(rest *Client) {
// OnEvent registers a handler for the given event name. Handlers are called
// when an incoming frame with type "event" and matching event name is
// received. Safe to call before Start.
// received. This is safe to call before Start.
func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) {
c.mu.Lock()
defer c.mu.Unlock()
@@ -92,7 +94,7 @@ func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) {
type wsFrame struct {
Type string `json:"type"` // "req", "res", "event"
ID string `json:"id,omitempty"` // request/response correlation
Method string `json:"method,omitempty"` // method name (req/res frames)
Method string `json:"method,omitempty"` // method name (req frames)
Event string `json:"event,omitempty"` // event name (event frames)
Params json.RawMessage `json:"params,omitempty"`
Result json.RawMessage `json:"result,omitempty"`
@@ -138,11 +140,12 @@ type helloOKResponse struct {
// ── Start loop ───────────────────────────────────────────────────────────
// Start connects to the gateway, completes the handshake, and begins the
// read loop. On disconnect it reconnects with exponential backoff (1s → 30s).
// On ctx cancellation it performs a clean shutdown.
// read loop. On disconnect it reconnects with exponential backoff. On
// ctx cancellation it performs a clean shutdown.
func (c *WSClient) Start(ctx context.Context) {
backoff := 1 * time.Second
initialBackoff := 1 * time.Second
maxBackoff := 30 * time.Second
backoff := initialBackoff
for {
err := c.connectAndRun(ctx)
@@ -154,6 +157,9 @@ func (c *WSClient) Start(ctx context.Context) {
c.logger.Warn("ws client disconnected, reconnecting",
"error", err,
"backoff", backoff)
} else {
// Reset backoff on successful connect+run completion
backoff = initialBackoff
}
select {
@@ -188,14 +194,26 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
c.conn = conn
c.connMu.Unlock()
defer conn.Close()
// When context is cancelled, close the conn to unblock ReadJSON in readLoop.
go func() {
<-ctx.Done()
c.connMu.Lock()
if c.conn != nil {
c.conn.Close()
}
c.connMu.Unlock()
}()
defer func() {
conn.Close()
}()
// Step 1: Read the connect.challenge frame
if err := c.readChallenge(conn); err != nil {
return fmt.Errorf("handshake challenge: %w", err)
}
// Step 2: Send connect request and read hello-ok response
// Step 2: Send connect request
helloOK, err := c.sendConnect(conn)
if err != nil {
return fmt.Errorf("handshake connect: %w", err)
@@ -206,26 +224,50 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
"methods", helloOK.Features.Methods,
"events", helloOK.Features.Events)
// Store connId for reference
c.connMu.Lock()
c.connID = helloOK.ConnID
c.connId = helloOK.ConnID
c.connMu.Unlock()
// Notify REST client that WS is live so it stands down
// Step 2b: Register live event handlers BEFORE starting the read
// loop. This eliminates the race window where readLoop dispatches
// live events as "unhandled" because no handlers are registered yet.
// The handlers only depend on c.agents and c.broker, which are wired
// in the constructor — they do not need initialSync to have completed.
c.registerEventHandlers()
// Step 2c: Start the read loop in a goroutine so that Send() in
// initialSync can receive responses. The read loop goroutine will
// continue running after initialSync completes, routing live events
// and any future RPC responses. Because handlers are already
// registered, any events arriving during or after initialSync are
// dispatched correctly.
readLoopErrCh := make(chan error, 1)
go func() {
readLoopErrCh <- c.readLoop(ctx, conn)
}()
// Step 2d: Initial sync — fetch agents + sessions from gateway.
// This works because the read loop is active and will route
// response frames back to Send() via handleResponse.
if err := c.initialSync(ctx); err != nil {
c.logger.Warn("initial sync failed, will continue with read loop", "error", err)
}
// Notify REST client that WS is live so it stands down.
// This must happen AFTER initialSync so that the REST poller
// doesn't start polling while we're still syncing.
if c.restClient != nil {
c.restClient.MarkWSReady()
c.logger.Info("ws client notified REST fallback to stand down")
}
// Step 3: Initial sync — fetch agents + sessions from gateway
if err := c.initialSync(ctx); err != nil {
c.logger.Warn("initial sync failed, continuing with read loop", "error", err)
}
// Reset wsReadyOnce so MarkWSReady can fire again after a reconnect
c.wsReadyOnce = sync.Once{}
// Step 4: Register live event handlers
c.registerEventHandlers()
// Step 5: Read loop — blocks until disconnect or ctx cancel
return c.readLoop(ctx, conn)
// Step 3: Wait for the read loop goroutine to finish (blocks
// until the connection drops or context is cancelled).
return <-readLoopErrCh
}
// readChallenge reads the first frame from the gateway, which must be a
@@ -240,7 +282,7 @@ func (c *WSClient) readChallenge(conn *websocket.Conn) error {
return fmt.Errorf("expected connect.challenge, got type=%s event=%s", frame.Type, frame.Event)
}
c.logger.Debug("received connect.challenge")
c.logger.Debug("received connect.challenge", "params", string(frame.Params))
return nil
}
@@ -293,6 +335,8 @@ func (c *WSClient) sendConnect(conn *websocket.Conn) (*helloOKResponse, error) {
return nil, fmt.Errorf("response id mismatch: expected %s, got %s", reqID, resFrame.ID)
}
// Check for hello-ok method in the result
// The gateway responds with method "hello-ok" on success
var helloOK helloOKResponse
if err := json.Unmarshal(resFrame.Result, &helloOK); err != nil {
return nil, fmt.Errorf("parse hello-ok: %w", err)
@@ -302,25 +346,16 @@ func (c *WSClient) sendConnect(conn *websocket.Conn) (*helloOKResponse, error) {
}
// readLoop continuously reads frames from the connection and routes them.
// It returns on read error or context cancellation.
// It returns on read error or when the connection is closed by the ctx-done
// goroutine started in connectAndRun.
func (c *WSClient) readLoop(ctx context.Context, conn *websocket.Conn) error {
for {
select {
case <-ctx.Done():
// Clean shutdown: send close frame
c.connMu.Lock()
c.conn.WriteControl(
websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, "shutdown"),
time.Now().Add(5*time.Second),
)
c.connMu.Unlock()
return ctx.Err()
default:
}
var frame wsFrame
if err := conn.ReadJSON(&frame); err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
// Check if it's a close error
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
c.logger.Info("ws connection closed by server")
return nil
@@ -344,7 +379,7 @@ func (c *WSClient) routeFrame(frame wsFrame) {
case "event":
c.handleEvent(frame)
default:
c.logger.Debug("unknown frame type", "type", frame.Type, "id", frame.ID)
c.logger.Warn("unknown frame type", "type", frame.Type, "id", frame.ID)
}
}
@@ -363,6 +398,7 @@ func (c *WSClient) handleResponse(frame wsFrame) {
}
if frame.Error != nil {
// Send nil to signal error; caller checks via Send return
ch <- nil
return
}
@@ -386,21 +422,18 @@ func (c *WSClient) handleEvent(frame wsFrame) {
}
}
// ── Send (RPC) ──────────────────────────────────────────────────────────
// ── Send ─────────────────────────────────────────────────────────────────
// Send sends a JSON-RPC request to the gateway and returns the response
// payload. It is safe for concurrent use.
// Send sends a JSON request to the gateway and returns the response payload.
// It is safe for concurrent use. Returns an error if the client is not
// connected.
func (c *WSClient) Send(method string, params any) (json.RawMessage, error) {
reqID := uuid.New().String()
var paramsJSON json.RawMessage
if params != nil {
var err error
paramsJSON, err = json.Marshal(params)
paramsJSON, err := json.Marshal(params)
if err != nil {
return nil, fmt.Errorf("marshal params: %w", err)
}
}
// Register pending response channel
respCh := make(chan json.RawMessage, 1)
@@ -423,7 +456,11 @@ func (c *WSClient) Send(method string, params any) (json.RawMessage, error) {
}
c.connMu.Lock()
err := c.conn.WriteJSON(frame)
if c.conn == nil {
c.connMu.Unlock()
return nil, fmt.Errorf("gateway: not connected")
}
err = c.conn.WriteJSON(frame)
c.connMu.Unlock()
if err != nil {
@@ -434,10 +471,10 @@ func (c *WSClient) Send(method string, params any) (json.RawMessage, error) {
select {
case resp := <-respCh:
if resp == nil {
return nil, fmt.Errorf("gateway returned error for request %s (%s)", reqID, method)
return nil, fmt.Errorf("gateway returned error for request %s", reqID)
}
return resp, nil
case <-time.After(30 * time.Second):
return nil, fmt.Errorf("request %s (%s) timed out", reqID, method)
return nil, fmt.Errorf("request %s timed out", reqID)
}
}
@@ -0,0 +1,715 @@
package gateway
import (
"context"
"encoding/json"
"log/slog"
"net/http"
"net/http/httptest"
"strings"
"sync/atomic"
"testing"
"time"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
"github.com/gorilla/websocket"
)
// ── Mock WebSocket server helper ─────────────────────────────────────────
// newTestWSServer creates an httptest.Server that upgrades to WebSocket and
// delegates each connection to handler. The server URL can be converted to
// a ws:// URL by replacing "http" with "ws".
func newTestWSServer(t *testing.T, handler func(conn *websocket.Conn)) *httptest.Server {
t.Helper()
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
handler(conn)
}))
return srv
}
// wsURL converts an httptest.Server http URL to a ws URL.
func wsURL(srv *httptest.Server) string {
return "ws" + strings.TrimPrefix(srv.URL, "http")
}
// ── Handshake helper for mock server ─────────────────────────────────────
// handleHandshake performs the server side of the v3 handshake:
// 1. Send connect.challenge
// 2. Read connect request
// 3. Send hello-ok response
//
// Returns the connect request frame for inspection.
func handleHandshake(t *testing.T, conn *websocket.Conn) map[string]any {
t.Helper()
// 1. Send connect.challenge
challenge := map[string]any{
"type": "event",
"event": "connect.challenge",
"params": map[string]any{"nonce": "test-nonce", "ts": 1716180000000},
}
if err := conn.WriteJSON(challenge); err != nil {
t.Fatalf("server: write challenge: %v", err)
}
// 2. Read connect request
var req map[string]any
if err := conn.ReadJSON(&req); err != nil {
t.Fatalf("server: read connect request: %v", err)
}
if req["method"] != "connect" {
t.Fatalf("server: expected method=connect, got %v", req["method"])
}
// 3. Send hello-ok response
// Note: helloOKResponse expects ConnID at the top level of the result,
// matching the WSClient's JSON struct tags.
result := map[string]any{
"type": "hello-ok",
"protocol": 3,
"connId": "test-conn-123",
"features": map[string]any{"methods": []string{}, "events": []string{}},
"auth": map[string]any{"role": "operator", "scopes": []string{"operator.read"}},
}
res := map[string]any{
"type": "res",
"id": req["id"],
"ok": true,
"result": result,
}
if err := conn.WriteJSON(res); err != nil {
t.Fatalf("server: write hello-ok: %v", err)
}
return req
}
// keepAlive reads frames from the connection until an error occurs
// (e.g., the client disconnects). Used as the default "do nothing"
// server loop after handshake.
func keepAlive(conn *websocket.Conn) {
for {
var m map[string]any
if err := conn.ReadJSON(&m); err != nil {
break
}
}
}
// ── 1. Test: Full handshake ──────────────────────────────────────────────
func TestWSClient_Handshake(t *testing.T) {
srv := newTestWSServer(t, func(conn *websocket.Conn) {
handleHandshake(t, conn)
keepAlive(conn)
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
done := make(chan struct{})
go func() {
client.Start(ctx)
close(done)
}()
// Wait briefly for handshake to complete
time.Sleep(200 * time.Millisecond)
// Verify connId was set
client.connMu.Lock()
connID := client.connId
client.connMu.Unlock()
if connID != "test-conn-123" {
t.Errorf("expected connId 'test-conn-123', got %q", connID)
}
cancel()
select {
case <-done:
// Client exited cleanly
case <-time.After(3 * time.Second):
t.Fatal("WSClient did not shut down after context cancellation")
}
}
// ── 2. Test: Send() with response matching ───────────────────────────────
func TestWSClient_Send(t *testing.T) {
srv := newTestWSServer(t, func(conn *websocket.Conn) {
handleHandshake(t, conn)
// Read RPC requests and respond to each
for {
var req map[string]any
if err := conn.ReadJSON(&req); err != nil {
break
}
reqID, _ := req["id"].(string)
method, _ := req["method"].(string)
var result any
switch method {
case "agents.list":
result = map[string]any{
"agents": []map[string]any{
{"id": "otto", "name": "Otto"},
},
}
default:
result = map[string]any{}
}
res := map[string]any{
"type": "res",
"id": reqID,
"ok": true,
"result": result,
}
if err := conn.WriteJSON(res); err != nil {
break
}
}
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
go client.Start(ctx)
// Give the client time to complete handshake
time.Sleep(300 * time.Millisecond)
resp, err := client.Send("agents.list", nil)
if err != nil {
t.Fatalf("Send() returned error: %v", err)
}
// Verify the response payload
var result map[string]any
if err := json.Unmarshal(resp, &result); err != nil {
t.Fatalf("unmarshal response: %v", err)
}
agents, ok := result["agents"].([]any)
if !ok || len(agents) != 1 {
t.Errorf("expected 1 agent in response, got %v", result)
}
cancel()
}
// ── 3. Test: Event handler routing ───────────────────────────────────────
func TestWSClient_EventRouting(t *testing.T) {
eventReceived := make(chan json.RawMessage, 1)
srv := newTestWSServer(t, func(conn *websocket.Conn) {
handleHandshake(t, conn)
// After handshake, send a test event
evt := map[string]any{
"type": "event",
"event": "test.event",
"params": map[string]any{"greeting": "hello from server"},
}
if err := conn.WriteJSON(evt); err != nil {
t.Logf("server: write event: %v", err)
return
}
keepAlive(conn)
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
// Register event handler BEFORE starting the client
client.OnEvent("test.event", func(payload json.RawMessage) {
eventReceived <- payload
})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
go client.Start(ctx)
// Wait for the event handler to fire
select {
case payload := <-eventReceived:
var data map[string]any
if err := json.Unmarshal(payload, &data); err != nil {
t.Fatalf("unmarshal event payload: %v", err)
}
if greeting, _ := data["greeting"].(string); greeting != "hello from server" {
t.Errorf("expected greeting 'hello from server', got %q", greeting)
}
case <-time.After(3 * time.Second):
t.Fatal("timed out waiting for event handler to fire")
}
cancel()
}
// ── 4. Test: Concurrent Send ─────────────────────────────────────────────
func TestWSClient_ConcurrentSend(t *testing.T) {
var reqCount atomic.Int32
srv := newTestWSServer(t, func(conn *websocket.Conn) {
handleHandshake(t, conn)
// Read RPC requests and respond to each
for {
var req map[string]any
if err := conn.ReadJSON(&req); err != nil {
break
}
reqID, _ := req["id"].(string)
n := reqCount.Add(1)
res := map[string]any{
"type": "res",
"id": reqID,
"ok": true,
"result": map[string]any{"index": n, "method": req["method"]},
}
if err := conn.WriteJSON(res); err != nil {
break
}
}
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go client.Start(ctx)
// Give the client time to complete handshake
time.Sleep(300 * time.Millisecond)
// Fire 3 concurrent Send() calls
type sendResult struct {
method string
payload json.RawMessage
err error
}
results := make(chan sendResult, 3)
methods := []string{"agents.list", "sessions.list", "agents.config"}
for _, method := range methods {
go func(m string) {
resp, err := client.Send(m, nil)
results <- sendResult{method: m, payload: resp, err: err}
}(method)
}
// Collect all results
for i := 0; i < 3; i++ {
select {
case r := <-results:
if r.err != nil {
t.Errorf("Send(%q) returned error: %v", r.method, r.err)
continue
}
var result map[string]any
if err := json.Unmarshal(r.payload, &result); err != nil {
t.Errorf("Send(%q) unmarshal error: %v", r.method, err)
continue
}
gotMethod, _ := result["method"].(string)
if gotMethod != r.method {
t.Errorf("Send(%q) got response for %q (mismatched)", r.method, gotMethod)
}
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for concurrent Send results")
}
}
cancel()
}
// ── 5. Test: Clean shutdown ──────────────────────────────────────────────
func TestWSClient_CleanShutdown(t *testing.T) {
srv := newTestWSServer(t, func(conn *websocket.Conn) {
handleHandshake(t, conn)
keepAlive(conn)
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, nil, nil, slog.Default())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
done := make(chan struct{})
go func() {
client.Start(ctx)
close(done)
}()
// Let the client connect and complete handshake
time.Sleep(200 * time.Millisecond)
// Cancel context — should trigger clean shutdown
cancel()
select {
case <-done:
// Client exited cleanly — pass
case <-time.After(3 * time.Second):
t.Fatal("WSClient did not shut down cleanly within timeout")
}
}
// ── Pure utility tests (from CUB-205) ─────────────────────────────────────
func TestMapSessionStatus(t *testing.T) {
tests := []struct {
input string
expected models.AgentStatus
}{
{"running", models.AgentStatusActive},
{"streaming", models.AgentStatusActive},
{"done", models.AgentStatusIdle},
{"error", models.AgentStatusError},
{"", models.AgentStatusIdle},
{"garbage", models.AgentStatusIdle},
}
for _, tt := range tests {
result := mapSessionStatus(tt.input)
if result != tt.expected {
t.Errorf("mapSessionStatus(%q) = %q, want %q", tt.input, result, tt.expected)
}
}
}
func TestAgentItemToCard(t *testing.T) {
t.Run("full fields", func(t *testing.T) {
item := agentListItem{
ID: "dex",
Name: "Dex",
Role: "backend",
Channel: "telegram",
}
card := agentItemToCard(item)
if card.ID != "dex" {
t.Errorf("ID = %q, want %q", card.ID, "dex")
}
if card.DisplayName != "Dex" {
t.Errorf("DisplayName = %q, want %q", card.DisplayName, "Dex")
}
if card.Role != "backend" {
t.Errorf("Role = %q, want %q", card.Role, "backend")
}
if card.Channel != "telegram" {
t.Errorf("Channel = %q, want %q", card.Channel, "telegram")
}
if card.Status != models.AgentStatusIdle {
t.Errorf("Status = %q, want %q", card.Status, models.AgentStatusIdle)
}
})
t.Run("empty fields use defaults", func(t *testing.T) {
item := agentListItem{
ID: "otto",
}
card := agentItemToCard(item)
if card.ID != "otto" {
t.Errorf("ID = %q, want %q", card.ID, "otto")
}
if card.DisplayName != "otto" {
t.Errorf("DisplayName = %q, want %q (should fallback to ID)", card.DisplayName, "otto")
}
if card.Role != "agent" {
t.Errorf("Role = %q, want %q (default)", card.Role, "agent")
}
if card.Channel != "unknown" {
t.Errorf("Channel = %q, want %q (per Grimm requirement)", card.Channel, "unknown")
}
if card.Status != models.AgentStatusIdle {
t.Errorf("Status = %q, want %q", card.Status, models.AgentStatusIdle)
}
})
t.Run("empty name falls back to ID", func(t *testing.T) {
item := agentListItem{
ID: "hex",
Name: "",
Role: "database",
}
card := agentItemToCard(item)
if card.DisplayName != "hex" {
t.Errorf("DisplayName = %q, want %q (ID fallback)", card.DisplayName, "hex")
}
})
}
// ── 6. Test: Initial sync ordering (readLoop active before Send) ──────────
// TestConnectAndRun_InitialSyncOrdering verifies that the WS client
// completes initial sync successfully. This test would hang/timeout if
// readLoop were NOT started before initialSync, because Send() relies on
// readLoop→routeFrame→handleResponse to deliver RPC responses.
func TestConnectAndRun_InitialSyncOrdering(t *testing.T) {
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
defer capture.close()
srv := newTestWSServer(t, func(conn *websocket.Conn) {
// Handshake
handleHandshake(t, conn)
// After handshake, respond to RPCs
for {
var req map[string]any
if err := conn.ReadJSON(&req); err != nil {
break
}
reqID, _ := req["id"].(string)
method, _ := req["method"].(string)
var result any
switch method {
case "agents.list":
result = []map[string]any{
{"id": "otto", "name": "Otto", "role": "Orchestrator", "channel": "discord"},
{"id": "dex", "name": "Dex", "role": "Backend Dev", "channel": "telegram"},
}
case "sessions.list":
result = []map[string]any{
{"sessionKey": "s1", "agentId": "otto", "status": "running", "totalTokens": 500, "lastActivityAt": "2025-05-20T12:00:00Z"},
}
default:
result = map[string]any{}
}
res := map[string]any{
"type": "res",
"id": reqID,
"ok": true,
"result": result,
}
if err := conn.WriteJSON(res); err != nil {
break
}
}
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, repo, broker, slog.Default())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
done := make(chan struct{})
go func() {
client.Start(ctx)
close(done)
}()
// Wait for initial sync to complete by checking repo state.
// The agents should be persisted from the RPC responses.
deadline := time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) {
repo.mu.Lock()
_, ottoOK := repo.agents["otto"]
_, dexOK := repo.agents["dex"]
repo.mu.Unlock()
if ottoOK && dexOK {
break
}
time.Sleep(50 * time.Millisecond)
}
repo.mu.Lock()
_, ottoOK := repo.agents["otto"]
_, dexOK := repo.agents["dex"]
repo.mu.Unlock()
if !ottoOK {
t.Error("otto not found in repo after initial sync — readLoop may not have been active before Send()")
}
if !dexOK {
t.Error("dex not found in repo after initial sync — readLoop may not have been active before Send()")
}
cancel()
select {
case <-done:
case <-time.After(3 * time.Second):
t.Fatal("WSClient did not shut down cleanly")
}
}
// ── 7. Test: Event not lost during initial sync (regression) ───────────────
// TestConnectAndRun_EventNotLostDuringSync verifies that live gateway events
// arriving during initial sync are NOT dropped. This is a regression test
// for the race where readLoop started before registerEventHandlers(),
// causing events read during that window to be logged as "unhandled" and lost.
//
// The mock server sends a live event (sessions.changed) right after the
// handshake, interleaved with the RPC responses for agents.list and
// sessions.list. The test asserts the event is received by the handler.
func TestConnectAndRun_EventNotLostDuringSync(t *testing.T) {
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
broker := handler.NewBroker()
capture := newBroadcastCapture(broker)
defer capture.close()
// Pre-seed an agent so the event handler can update it.
repo.agents["otto"] = models.AgentCardData{
ID: "otto",
DisplayName: "Otto",
Status: models.AgentStatusIdle,
}
srv := newTestWSServer(t, func(conn *websocket.Conn) {
// Handshake
handleHandshake(t, conn)
// After handshake, process RPCs and inject a live event.
for {
var req map[string]any
if err := conn.ReadJSON(&req); err != nil {
break
}
reqID, _ := req["id"].(string)
method, _ := req["method"].(string)
// Respond to agents.list RPC
if method == "agents.list" {
// Before responding, inject a live event — simulates
// a gateway pushing a presence update during sync.
evt := map[string]any{
"type": "event",
"event": "presence",
"params": map[string]any{"agentId": "otto", "connected": true, "lastActivityAt": "2025-05-20T12:30:00Z"},
}
if err := conn.WriteJSON(evt); err != nil {
break
}
// Now send the RPC response
res := map[string]any{
"type": "res",
"id": reqID,
"ok": true,
"result": []map[string]any{
{"id": "otto", "name": "Otto", "role": "Orchestrator", "channel": "discord"},
},
}
if err := conn.WriteJSON(res); err != nil {
break
}
continue
}
// Respond to sessions.list RPC
if method == "sessions.list" {
res := map[string]any{
"type": "res",
"id": reqID,
"ok": true,
"result": []map[string]any{},
}
if err := conn.WriteJSON(res); err != nil {
break
}
continue
}
// Default response for other methods
res := map[string]any{
"type": "res",
"id": reqID,
"ok": true,
"result": map[string]any{},
}
if err := conn.WriteJSON(res); err != nil {
break
}
}
})
defer srv.Close()
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, repo, broker, slog.Default())
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
done := make(chan struct{})
go func() {
client.Start(ctx)
close(done)
}()
// Wait for the presence event to be processed by checking the repo.
// The presence handler updates the agent, so we check for the
// lastActivityAt change.
deadline := time.Now().Add(5 * time.Second)
var lastActivity string
for time.Now().Before(deadline) {
repo.mu.Lock()
if a, ok := repo.agents["otto"]; ok {
lastActivity = a.LastActivity
}
repo.mu.Unlock()
if lastActivity == "2025-05-20T12:30:00Z" {
break
}
time.Sleep(50 * time.Millisecond)
}
if lastActivity != "2025-05-20T12:30:00Z" {
t.Errorf("presence event during sync was lost: lastActivity = %q, want %q", lastActivity, "2025-05-20T12:30:00Z")
}
cancel()
select {
case <-done:
case <-time.After(3 * time.Second):
t.Fatal("WSClient did not shut down cleanly")
}
}
func TestStrPtr(t *testing.T) {
s := "hello"
p := strPtr(s)
if p == nil {
t.Fatal("strPtr returned nil")
}
if *p != s {
t.Errorf("strPtr(%q) = %q, want %q", s, *p, s)
}
empty := ""
ep := strPtr(empty)
if *ep != empty {
t.Errorf("strPtr(empty) = %q, want %q", *ep, empty)
}
}
+3
View File
@@ -64,6 +64,9 @@ type CreateAgentRequest struct {
// UpdateAgentRequest is the payload for PUT /api/agents/{id}.
type UpdateAgentRequest struct {
Status *AgentStatus `json:"status,omitempty" validate:"omitempty,agentStatus"`
DisplayName *string `json:"displayName,omitempty"`
Role *string `json:"role,omitempty"`
LastActivityAt *string `json:"lastActivityAt,omitempty"`
CurrentTask *string `json:"currentTask,omitempty"`
TaskProgress *int `json:"taskProgress,omitempty" validate:"omitempty,min=0,max=100"`
TaskElapsed *string `json:"taskElapsed,omitempty"`
+46
View File
@@ -0,0 +1,46 @@
# Control Center — Architecture Context
## Current State
The Control Center backend uses a **dual-path gateway client** architecture:
- **Primary path**: WebSocket client (`gateway.WSClient`) connects to the OpenClaw gateway using WS protocol v3. It handles handshake, initial sync (agents.list + sessions.list RPCs), live event routing (sessions.changed, presence, agent.config), and automatic reconnection with exponential backoff.
- **Fallback path**: REST poller (`gateway.Client`) polls the gateway `/api/agents` endpoint on an interval. It only activates if the WS client fails to connect within 30 seconds of startup.
## Live Gateway Connection
### Startup Sequence
1. Both WS client and REST client start concurrently
2. REST client waits 30s for WS readiness signal (`wsReady` channel)
3. If WS connects successfully → REST client stands down (logs "using WS — REST poller standing down")
4. If WS fails within 30s → REST client falls back to polling (logs "WS not ready — falling back to REST polling")
5. If no WS client configured → REST client polls immediately
### WebSocket Client (Primary)
- Config: `WS_GATEWAY_URL` (default: `ws://host.docker.internal:18789/`), `OPENCLAW_GATEWAY_TOKEN`
- Protocol: v3 handshake (challenge → connect → hello-ok)
- Initial sync: `agents.list` + `sessions.list` RPCs → persist → merge → broadcast `fleet.update`
- Live events: `sessions.changed`, `presence`, `agent.config`
- Reconnection: exponential backoff (1s → 2s → 4s → ... → 30s max)
### REST Poller (Fallback)
- Config: `GATEWAY_URL` (default: `http://host.docker.internal:18789/api/agents`), `GATEWAY_POLL_INTERVAL` (default: 5s)
- Only used when WS is unavailable
- Polls the `/api/agents` endpoint and syncs agent status changes
### Wiring
```
main.go
├── wsClient = NewWSClient(...)
├── restClient = NewClient(...)
├── wsClient.SetRESTClient(restClient) // WS notifies REST on ready
├── restClient.SetWSClient(wsClient) // REST defers to WS
├── go wsClient.Start(ctx) // primary
└── go restClient.Start(ctx) // fallback (waits for WS)
```
## Key Design Decisions
- **Push over poll**: WS is preferred for real-time updates; REST is a safety net
- **DB first, then SSE**: All event handlers persist to DB before broadcasting
- **Graceful degradation**: System works without WS; REST provides basic functionality
- **No hard dependency on REST /api/agents**: If WS is connected, REST endpoint is never called