Compare commits

..

3 Commits

Author SHA1 Message Date
overseer 9d3ddc4340 Merge branch 'dev' into agent/rex/CUB-125-realtime-sse
Dev Build & Deploy / test-and-build (pull_request) Failing after 0s
Dev Build & Deploy / docker-build-push (pull_request) Has been skipped
2026-05-20 12:52:12 -04:00
Rex ffc127f12d CUB-125: address Grimm review — tests, type fixes, error state circuit breaker
Dev Build & Deploy / test-and-build (pull_request) Failing after 0s
Dev Build & Deploy / docker-build-push (pull_request) Has been skipped
- Add missing 'offline' to AgentStatus union type (types/index.ts)
- Add max-retry circuit breaker to useSSE; error state is now reachable
- Wire typed SSE payloads (SSEPayloadMap discriminated union) into useRealtimeSync
- Add Vitest + 20 unit tests: useSSE lifecycle, back-off, circuit breaker,
  event parsing, cleanup; useRealtimeSync event-to-invalidation mapping
- Rebase on dev to remove stale CUB-119 legacy-deletion commit and align
  CI workflow (dev already consolidated into single dev.yml)
- Tests: npm test → 20/20 pass; Build: npm run build → 0 errors
2026-05-20 16:51:13 +00:00
overseer 724a4a9427 CUB-125: implement real-time SSE/WebSocket in React frontend
- Add useSSE hook with exponential back-off reconnect (1s → 30s)
- Add useRealtimeSync hook: maps SSE events to React Query invalidation
  (agent.status → agents; agent.task/agent.progress → tasks+agents; fleet.update → all)
- Add SSEContext/SSEProvider so connection status is available app-wide
- Mount SSEProvider in main.tsx inside QueryClientProvider (no polling)
- Show live/connecting/reconnecting/disconnected badge in sidebar + mobile header
- Update SettingsPage: replace polling interval UI with SSE status panel
- Disable React Query polling (staleTime 60s); all updates pushed via SSE

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-20 16:32:12 +00:00
5 changed files with 55 additions and 58 deletions
+2 -2
View File
@@ -32,7 +32,7 @@ GATEWAY_POLL_INTERVAL=5s
# When using docker-compose, these are set in the services section # When using docker-compose, these are set in the services section
# See docker-compose.yml for service-specific environment variables # See docker-compose.yml for service-specific environment variables
# ── Database Configuration ─────────────────────────────────────────────── # ── Database Configuration ─────────────────────────────────────────────
# Set in the db service environment section of docker-compose.yml # Set in the db service environment section of docker-compose.yml
# POSTGRES_USER=controlcenter # POSTGRES_USER=controlcenter
# POSTGRES_PASSWORD=controlcenter # POSTGRES_PASSWORD=controlcenter
@@ -47,4 +47,4 @@ GATEWAY_POLL_INTERVAL=5s
# For Docker deployment: # For Docker deployment:
# 1. Copy .env.example to .env (backend only) # 1. Copy .env.example to .env (backend only)
# 2. Run: docker compose up -d # 2. Run: docker compose up -d
# 3. Access frontend at http://localhost:3000 # 3. Access frontend at http://localhost:3000
+2 -2
View File
@@ -112,7 +112,7 @@ func main() {
<-quit <-quit
slog.Info("shutting down server...") slog.Info("shutting down server...")
cancel() // stop gateway clients cancel() // stop gateway polling
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second) shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
defer shutdownCancel() defer shutdownCancel()
@@ -136,4 +136,4 @@ func parseLogLevel(level string) slog.Level {
default: default:
return slog.LevelInfo return slog.LevelInfo
} }
} }
+19 -19
View File
@@ -10,30 +10,30 @@ import (
// Config holds all application configuration. // Config holds all application configuration.
type Config struct { type Config struct {
Port int Port int
DatabaseURL string DatabaseURL string
CORSOrigin string CORSOrigin string
LogLevel string LogLevel string
Environment string Environment string
GatewayRestURL string GatewayRestURL string
GatewayRestPollInterval time.Duration GatewayRestPollInterval time.Duration
WSGatewayURL string WSGatewayURL string
WSGatewayToken string WSGatewayToken string
} }
// Load reads configuration from environment variables, applying defaults where // Load reads configuration from environment variables, applying defaults where
// values are not set. All secrets come from the environment — nothing is hardcoded. // values are not set. All secrets come from the environment — nothing is hardcoded.
func Load() *Config { func Load() *Config {
return &Config{ return &Config{
Port: getEnvInt("PORT", 8080), Port: getEnvInt("PORT", 8080),
DatabaseURL: getEnv("DATABASE_URL", "postgres://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable"), DatabaseURL: getEnv("DATABASE_URL", "postgres://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable"),
CORSOrigin: getEnv("CORS_ORIGIN", "*"), CORSOrigin: getEnv("CORS_ORIGIN", "*"),
LogLevel: getEnv("LOG_LEVEL", "info"), LogLevel: getEnv("LOG_LEVEL", "info"),
Environment: getEnv("ENVIRONMENT", "development"), Environment: getEnv("ENVIRONMENT", "development"),
GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"), GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"),
GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second), GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"), WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"),
WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""), WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""),
} }
} }
@@ -60,4 +60,4 @@ func getEnvDuration(key string, fallback time.Duration) time.Duration {
} }
} }
return fallback return fallback
} }
+28 -31
View File
@@ -1,10 +1,6 @@
// Package gateway provides an OpenClaw gateway integration client that // Package gateway provides an OpenClaw gateway integration client that
// polls agent states, persists them via the repository layer, and broadcasts // polls agent states, persists them via the repository layer, and broadcasts
// changes through the SSE broker for real-time frontend updates. // changes through the SSE broker for real-time frontend updates.
//
// When a WSClient is wired via SetWSClient, the REST poller becomes a
// fallback: it waits for the WS client to signal readiness, and only starts
// polling if WS fails to connect within 30 seconds.
package gateway package gateway
import ( import (
@@ -33,7 +29,7 @@ type Client struct {
broker *handler.Broker broker *handler.Broker
wsClient *WSClient // optional WS client; when set, REST is fallback only wsClient *WSClient // optional WS client; when set, REST is fallback only
wsReady chan struct{} // closed once WS connection is established wsReady chan struct{} // closed once WS connection is established
wsReadyOnce sync.Once // protects wsReady close from double-close race wsReadyOnce sync.Once // protects wsReady close from double-close race
} }
// Config holds gateway client configuration, typically loaded from environment. // Config holds gateway client configuration, typically loaded from environment.
@@ -144,6 +140,7 @@ func (c *Client) poll(ctx context.Context) {
} }
for _, ga := range agents { for _, ga := range agents {
// Check if agent already exists; if so, update; otherwise create.
existing, err := c.agents.Get(ctx, ga.ID) existing, err := c.agents.Get(ctx, ga.ID)
if err != nil { if err != nil {
// Not found — create it // Not found — create it
@@ -188,51 +185,51 @@ func SeedDemoAgents(ctx context.Context, agents repository.AgentRepo) error {
slog.Info("seeding demo agents") slog.Info("seeding demo agents")
demoAgents := []models.AgentCardData{ demoAgents := []models.AgentCardData{
{ {
ID: "otto", ID: "otto",
DisplayName: "Otto", DisplayName: "Otto",
Role: "Orchestrator", Role: "Orchestrator",
Status: models.AgentStatusActive, Status: models.AgentStatusActive,
CurrentTask: strPtr("Orchestrating tasks"), CurrentTask: strPtr("Orchestrating tasks"),
SessionKey: "otto-session", SessionKey: "otto-session",
Channel: "discord", Channel: "discord",
LastActivity: time.Now().UTC().Format(time.RFC3339), LastActivity: time.Now().UTC().Format(time.RFC3339),
}, },
{ {
ID: "rex", ID: "rex",
DisplayName: "Rex", DisplayName: "Rex",
Role: "Frontend Dev", Role: "Frontend Dev",
Status: models.AgentStatusIdle, Status: models.AgentStatusIdle,
SessionKey: "rex-session", SessionKey: "rex-session",
Channel: "discord", Channel: "discord",
LastActivity: time.Now().UTC().Add(-10 * time.Minute).Format(time.RFC3339), LastActivity: time.Now().UTC().Add(-10 * time.Minute).Format(time.RFC3339),
}, },
{ {
ID: "dex", ID: "dex",
DisplayName: "Dex", DisplayName: "Dex",
Role: "Backend Dev", Role: "Backend Dev",
Status: models.AgentStatusThinking, Status: models.AgentStatusThinking,
CurrentTask: strPtr("Designing API contracts"), CurrentTask: strPtr("Designing API contracts"),
SessionKey: "dex-session", SessionKey: "dex-session",
Channel: "discord", Channel: "discord",
LastActivity: time.Now().UTC().Format(time.RFC3339), LastActivity: time.Now().UTC().Format(time.RFC3339),
}, },
{ {
ID: "hex", ID: "hex",
DisplayName: "Hex", DisplayName: "Hex",
Role: "Database Specialist", Role: "Database Specialist",
Status: models.AgentStatusActive, Status: models.AgentStatusActive,
CurrentTask: strPtr("Reviewing schema migrations"), CurrentTask: strPtr("Reviewing schema migrations"),
SessionKey: "hex-session", SessionKey: "hex-session",
Channel: "discord", Channel: "discord",
LastActivity: time.Now().UTC().Format(time.RFC3339), LastActivity: time.Now().UTC().Format(time.RFC3339),
}, },
{ {
ID: "pip", ID: "pip",
DisplayName: "Pip", DisplayName: "Pip",
Role: "Edge Device Dev", Role: "Edge Device Dev",
Status: models.AgentStatusIdle, Status: models.AgentStatusIdle,
SessionKey: "pip-session", SessionKey: "pip-session",
Channel: "discord", Channel: "discord",
LastActivity: time.Now().UTC().Add(-1 * time.Hour).Format(time.RFC3339), LastActivity: time.Now().UTC().Add(-1 * time.Hour).Format(time.RFC3339),
}, },
} }
@@ -246,4 +243,4 @@ func SeedDemoAgents(ctx context.Context, agents repository.AgentRepo) error {
return nil return nil
} }
func strPtr(s string) *string { return &s } func strPtr(s string) *string { return &s }
+4 -4
View File
@@ -92,10 +92,10 @@ func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) {
// wsFrame represents a generic WebSocket frame in the OpenClaw v3 protocol. // wsFrame represents a generic WebSocket frame in the OpenClaw v3 protocol.
type wsFrame struct { type wsFrame struct {
Type string `json:"type"` // "req", "res", "event" Type string `json:"type"` // "req", "res", "event"
ID string `json:"id,omitempty"` // request/response correlation ID string `json:"id,omitempty"` // request/response correlation
Method string `json:"method,omitempty"` // method name (req frames) Method string `json:"method,omitempty"` // method name (req frames)
Event string `json:"event,omitempty"` // event name (event frames) Event string `json:"event,omitempty"` // event name (event frames)
Params json.RawMessage `json:"params,omitempty"` Params json.RawMessage `json:"params,omitempty"`
Result json.RawMessage `json:"result,omitempty"` Result json.RawMessage `json:"result,omitempty"`
Error *wsError `json:"error,omitempty"` Error *wsError `json:"error,omitempty"`