Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 9d3ddc4340 | |||
| ffc127f12d | |||
| 724a4a9427 |
+2
-2
@@ -32,7 +32,7 @@ GATEWAY_POLL_INTERVAL=5s
|
|||||||
# When using docker-compose, these are set in the services section
|
# When using docker-compose, these are set in the services section
|
||||||
# See docker-compose.yml for service-specific environment variables
|
# See docker-compose.yml for service-specific environment variables
|
||||||
|
|
||||||
# ── Database Configuration ───────────────────────────────────────────────
|
# ── Database Configuration ─────────────────────────────────────────────
|
||||||
# Set in the db service environment section of docker-compose.yml
|
# Set in the db service environment section of docker-compose.yml
|
||||||
# POSTGRES_USER=controlcenter
|
# POSTGRES_USER=controlcenter
|
||||||
# POSTGRES_PASSWORD=controlcenter
|
# POSTGRES_PASSWORD=controlcenter
|
||||||
@@ -47,4 +47,4 @@ GATEWAY_POLL_INTERVAL=5s
|
|||||||
# For Docker deployment:
|
# For Docker deployment:
|
||||||
# 1. Copy .env.example to .env (backend only)
|
# 1. Copy .env.example to .env (backend only)
|
||||||
# 2. Run: docker compose up -d
|
# 2. Run: docker compose up -d
|
||||||
# 3. Access frontend at http://localhost:3000
|
# 3. Access frontend at http://localhost:3000
|
||||||
|
|||||||
@@ -112,7 +112,7 @@ func main() {
|
|||||||
<-quit
|
<-quit
|
||||||
slog.Info("shutting down server...")
|
slog.Info("shutting down server...")
|
||||||
|
|
||||||
cancel() // stop gateway clients
|
cancel() // stop gateway polling
|
||||||
|
|
||||||
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
|
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||||
defer shutdownCancel()
|
defer shutdownCancel()
|
||||||
@@ -136,4 +136,4 @@ func parseLogLevel(level string) slog.Level {
|
|||||||
default:
|
default:
|
||||||
return slog.LevelInfo
|
return slog.LevelInfo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,30 +10,30 @@ import (
|
|||||||
|
|
||||||
// Config holds all application configuration.
|
// Config holds all application configuration.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Port int
|
Port int
|
||||||
DatabaseURL string
|
DatabaseURL string
|
||||||
CORSOrigin string
|
CORSOrigin string
|
||||||
LogLevel string
|
LogLevel string
|
||||||
Environment string
|
Environment string
|
||||||
GatewayRestURL string
|
GatewayRestURL string
|
||||||
GatewayRestPollInterval time.Duration
|
GatewayRestPollInterval time.Duration
|
||||||
WSGatewayURL string
|
WSGatewayURL string
|
||||||
WSGatewayToken string
|
WSGatewayToken string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load reads configuration from environment variables, applying defaults where
|
// Load reads configuration from environment variables, applying defaults where
|
||||||
// values are not set. All secrets come from the environment — nothing is hardcoded.
|
// values are not set. All secrets come from the environment — nothing is hardcoded.
|
||||||
func Load() *Config {
|
func Load() *Config {
|
||||||
return &Config{
|
return &Config{
|
||||||
Port: getEnvInt("PORT", 8080),
|
Port: getEnvInt("PORT", 8080),
|
||||||
DatabaseURL: getEnv("DATABASE_URL", "postgres://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable"),
|
DatabaseURL: getEnv("DATABASE_URL", "postgres://controlcenter:controlcenter@localhost:5432/controlcenter?sslmode=disable"),
|
||||||
CORSOrigin: getEnv("CORS_ORIGIN", "*"),
|
CORSOrigin: getEnv("CORS_ORIGIN", "*"),
|
||||||
LogLevel: getEnv("LOG_LEVEL", "info"),
|
LogLevel: getEnv("LOG_LEVEL", "info"),
|
||||||
Environment: getEnv("ENVIRONMENT", "development"),
|
Environment: getEnv("ENVIRONMENT", "development"),
|
||||||
GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"),
|
GatewayRestURL: getEnv("GATEWAY_URL", "http://host.docker.internal:18789/api/agents"),
|
||||||
GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
|
GatewayRestPollInterval: getEnvDuration("GATEWAY_POLL_INTERVAL", 5*time.Second),
|
||||||
WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"),
|
WSGatewayURL: getEnv("WS_GATEWAY_URL", "ws://host.docker.internal:18789/"),
|
||||||
WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""),
|
WSGatewayToken: getEnv("OPENCLAW_GATEWAY_TOKEN", ""),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -60,4 +60,4 @@ func getEnvDuration(key string, fallback time.Duration) time.Duration {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
return fallback
|
return fallback
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,10 +1,6 @@
|
|||||||
// Package gateway provides an OpenClaw gateway integration client that
|
// Package gateway provides an OpenClaw gateway integration client that
|
||||||
// polls agent states, persists them via the repository layer, and broadcasts
|
// polls agent states, persists them via the repository layer, and broadcasts
|
||||||
// changes through the SSE broker for real-time frontend updates.
|
// changes through the SSE broker for real-time frontend updates.
|
||||||
//
|
|
||||||
// When a WSClient is wired via SetWSClient, the REST poller becomes a
|
|
||||||
// fallback: it waits for the WS client to signal readiness, and only starts
|
|
||||||
// polling if WS fails to connect within 30 seconds.
|
|
||||||
package gateway
|
package gateway
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -33,7 +29,7 @@ type Client struct {
|
|||||||
broker *handler.Broker
|
broker *handler.Broker
|
||||||
wsClient *WSClient // optional WS client; when set, REST is fallback only
|
wsClient *WSClient // optional WS client; when set, REST is fallback only
|
||||||
wsReady chan struct{} // closed once WS connection is established
|
wsReady chan struct{} // closed once WS connection is established
|
||||||
wsReadyOnce sync.Once // protects wsReady close from double-close race
|
wsReadyOnce sync.Once // protects wsReady close from double-close race
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config holds gateway client configuration, typically loaded from environment.
|
// Config holds gateway client configuration, typically loaded from environment.
|
||||||
@@ -144,6 +140,7 @@ func (c *Client) poll(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, ga := range agents {
|
for _, ga := range agents {
|
||||||
|
// Check if agent already exists; if so, update; otherwise create.
|
||||||
existing, err := c.agents.Get(ctx, ga.ID)
|
existing, err := c.agents.Get(ctx, ga.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Not found — create it
|
// Not found — create it
|
||||||
@@ -188,51 +185,51 @@ func SeedDemoAgents(ctx context.Context, agents repository.AgentRepo) error {
|
|||||||
slog.Info("seeding demo agents")
|
slog.Info("seeding demo agents")
|
||||||
demoAgents := []models.AgentCardData{
|
demoAgents := []models.AgentCardData{
|
||||||
{
|
{
|
||||||
ID: "otto",
|
ID: "otto",
|
||||||
DisplayName: "Otto",
|
DisplayName: "Otto",
|
||||||
Role: "Orchestrator",
|
Role: "Orchestrator",
|
||||||
Status: models.AgentStatusActive,
|
Status: models.AgentStatusActive,
|
||||||
CurrentTask: strPtr("Orchestrating tasks"),
|
CurrentTask: strPtr("Orchestrating tasks"),
|
||||||
SessionKey: "otto-session",
|
SessionKey: "otto-session",
|
||||||
Channel: "discord",
|
Channel: "discord",
|
||||||
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ID: "rex",
|
ID: "rex",
|
||||||
DisplayName: "Rex",
|
DisplayName: "Rex",
|
||||||
Role: "Frontend Dev",
|
Role: "Frontend Dev",
|
||||||
Status: models.AgentStatusIdle,
|
Status: models.AgentStatusIdle,
|
||||||
SessionKey: "rex-session",
|
SessionKey: "rex-session",
|
||||||
Channel: "discord",
|
Channel: "discord",
|
||||||
LastActivity: time.Now().UTC().Add(-10 * time.Minute).Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Add(-10 * time.Minute).Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ID: "dex",
|
ID: "dex",
|
||||||
DisplayName: "Dex",
|
DisplayName: "Dex",
|
||||||
Role: "Backend Dev",
|
Role: "Backend Dev",
|
||||||
Status: models.AgentStatusThinking,
|
Status: models.AgentStatusThinking,
|
||||||
CurrentTask: strPtr("Designing API contracts"),
|
CurrentTask: strPtr("Designing API contracts"),
|
||||||
SessionKey: "dex-session",
|
SessionKey: "dex-session",
|
||||||
Channel: "discord",
|
Channel: "discord",
|
||||||
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ID: "hex",
|
ID: "hex",
|
||||||
DisplayName: "Hex",
|
DisplayName: "Hex",
|
||||||
Role: "Database Specialist",
|
Role: "Database Specialist",
|
||||||
Status: models.AgentStatusActive,
|
Status: models.AgentStatusActive,
|
||||||
CurrentTask: strPtr("Reviewing schema migrations"),
|
CurrentTask: strPtr("Reviewing schema migrations"),
|
||||||
SessionKey: "hex-session",
|
SessionKey: "hex-session",
|
||||||
Channel: "discord",
|
Channel: "discord",
|
||||||
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ID: "pip",
|
ID: "pip",
|
||||||
DisplayName: "Pip",
|
DisplayName: "Pip",
|
||||||
Role: "Edge Device Dev",
|
Role: "Edge Device Dev",
|
||||||
Status: models.AgentStatusIdle,
|
Status: models.AgentStatusIdle,
|
||||||
SessionKey: "pip-session",
|
SessionKey: "pip-session",
|
||||||
Channel: "discord",
|
Channel: "discord",
|
||||||
LastActivity: time.Now().UTC().Add(-1 * time.Hour).Format(time.RFC3339),
|
LastActivity: time.Now().UTC().Add(-1 * time.Hour).Format(time.RFC3339),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@@ -246,4 +243,4 @@ func SeedDemoAgents(ctx context.Context, agents repository.AgentRepo) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func strPtr(s string) *string { return &s }
|
func strPtr(s string) *string { return &s }
|
||||||
|
|||||||
@@ -92,10 +92,10 @@ func (c *WSClient) OnEvent(event string, handler func(json.RawMessage)) {
|
|||||||
|
|
||||||
// wsFrame represents a generic WebSocket frame in the OpenClaw v3 protocol.
|
// wsFrame represents a generic WebSocket frame in the OpenClaw v3 protocol.
|
||||||
type wsFrame struct {
|
type wsFrame struct {
|
||||||
Type string `json:"type"` // "req", "res", "event"
|
Type string `json:"type"` // "req", "res", "event"
|
||||||
ID string `json:"id,omitempty"` // request/response correlation
|
ID string `json:"id,omitempty"` // request/response correlation
|
||||||
Method string `json:"method,omitempty"` // method name (req frames)
|
Method string `json:"method,omitempty"` // method name (req frames)
|
||||||
Event string `json:"event,omitempty"` // event name (event frames)
|
Event string `json:"event,omitempty"` // event name (event frames)
|
||||||
Params json.RawMessage `json:"params,omitempty"`
|
Params json.RawMessage `json:"params,omitempty"`
|
||||||
Result json.RawMessage `json:"result,omitempty"`
|
Result json.RawMessage `json:"result,omitempty"`
|
||||||
Error *wsError `json:"error,omitempty"`
|
Error *wsError `json:"error,omitempty"`
|
||||||
@@ -229,34 +229,7 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
|
|||||||
c.connId = helloOK.ConnID
|
c.connId = helloOK.ConnID
|
||||||
c.connMu.Unlock()
|
c.connMu.Unlock()
|
||||||
|
|
||||||
// Step 2b: Register live event handlers BEFORE starting the read
|
// Notify REST client that WS is live so it stands down
|
||||||
// loop. This eliminates the race window where readLoop dispatches
|
|
||||||
// live events as "unhandled" because no handlers are registered yet.
|
|
||||||
// The handlers only depend on c.agents and c.broker, which are wired
|
|
||||||
// in the constructor — they do not need initialSync to have completed.
|
|
||||||
c.registerEventHandlers()
|
|
||||||
|
|
||||||
// Step 2c: Start the read loop in a goroutine so that Send() in
|
|
||||||
// initialSync can receive responses. The read loop goroutine will
|
|
||||||
// continue running after initialSync completes, routing live events
|
|
||||||
// and any future RPC responses. Because handlers are already
|
|
||||||
// registered, any events arriving during or after initialSync are
|
|
||||||
// dispatched correctly.
|
|
||||||
readLoopErrCh := make(chan error, 1)
|
|
||||||
go func() {
|
|
||||||
readLoopErrCh <- c.readLoop(ctx, conn)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Step 2d: Initial sync — fetch agents + sessions from gateway.
|
|
||||||
// This works because the read loop is active and will route
|
|
||||||
// response frames back to Send() via handleResponse.
|
|
||||||
if err := c.initialSync(ctx); err != nil {
|
|
||||||
c.logger.Warn("initial sync failed, will continue with read loop", "error", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Notify REST client that WS is live so it stands down.
|
|
||||||
// This must happen AFTER initialSync so that the REST poller
|
|
||||||
// doesn't start polling while we're still syncing.
|
|
||||||
if c.restClient != nil {
|
if c.restClient != nil {
|
||||||
c.restClient.MarkWSReady()
|
c.restClient.MarkWSReady()
|
||||||
c.logger.Info("ws client notified REST fallback to stand down")
|
c.logger.Info("ws client notified REST fallback to stand down")
|
||||||
@@ -265,9 +238,16 @@ func (c *WSClient) connectAndRun(ctx context.Context) error {
|
|||||||
// Reset wsReadyOnce so MarkWSReady can fire again after a reconnect
|
// Reset wsReadyOnce so MarkWSReady can fire again after a reconnect
|
||||||
c.wsReadyOnce = sync.Once{}
|
c.wsReadyOnce = sync.Once{}
|
||||||
|
|
||||||
// Step 3: Wait for the read loop goroutine to finish (blocks
|
// Step 2b: Initial sync — fetch agents + sessions from gateway
|
||||||
// until the connection drops or context is cancelled).
|
if err := c.initialSync(ctx); err != nil {
|
||||||
return <-readLoopErrCh
|
c.logger.Warn("initial sync failed, will continue with read loop", "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 2c: Register live event handlers
|
||||||
|
c.registerEventHandlers()
|
||||||
|
|
||||||
|
// Step 3: Read loop
|
||||||
|
return c.readLoop(ctx, conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
// readChallenge reads the first frame from the gateway, which must be a
|
// readChallenge reads the first frame from the gateway, which must be a
|
||||||
|
|||||||
@@ -11,7 +11,6 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/handler"
|
|
||||||
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
|
"code.cubecraftcreations.com/CubeCraft-Creations/Control-Center/go-backend/internal/models"
|
||||||
|
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
@@ -467,236 +466,6 @@ func TestAgentItemToCard(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── 6. Test: Initial sync ordering (readLoop active before Send) ──────────
|
|
||||||
|
|
||||||
// TestConnectAndRun_InitialSyncOrdering verifies that the WS client
|
|
||||||
// completes initial sync successfully. This test would hang/timeout if
|
|
||||||
// readLoop were NOT started before initialSync, because Send() relies on
|
|
||||||
// readLoop→routeFrame→handleResponse to deliver RPC responses.
|
|
||||||
func TestConnectAndRun_InitialSyncOrdering(t *testing.T) {
|
|
||||||
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
|
||||||
broker := handler.NewBroker()
|
|
||||||
capture := newBroadcastCapture(broker)
|
|
||||||
defer capture.close()
|
|
||||||
|
|
||||||
srv := newTestWSServer(t, func(conn *websocket.Conn) {
|
|
||||||
// Handshake
|
|
||||||
handleHandshake(t, conn)
|
|
||||||
|
|
||||||
// After handshake, respond to RPCs
|
|
||||||
for {
|
|
||||||
var req map[string]any
|
|
||||||
if err := conn.ReadJSON(&req); err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
reqID, _ := req["id"].(string)
|
|
||||||
method, _ := req["method"].(string)
|
|
||||||
|
|
||||||
var result any
|
|
||||||
switch method {
|
|
||||||
case "agents.list":
|
|
||||||
result = []map[string]any{
|
|
||||||
{"id": "otto", "name": "Otto", "role": "Orchestrator", "channel": "discord"},
|
|
||||||
{"id": "dex", "name": "Dex", "role": "Backend Dev", "channel": "telegram"},
|
|
||||||
}
|
|
||||||
case "sessions.list":
|
|
||||||
result = []map[string]any{
|
|
||||||
{"sessionKey": "s1", "agentId": "otto", "status": "running", "totalTokens": 500, "lastActivityAt": "2025-05-20T12:00:00Z"},
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
result = map[string]any{}
|
|
||||||
}
|
|
||||||
|
|
||||||
res := map[string]any{
|
|
||||||
"type": "res",
|
|
||||||
"id": reqID,
|
|
||||||
"ok": true,
|
|
||||||
"result": result,
|
|
||||||
}
|
|
||||||
if err := conn.WriteJSON(res); err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
defer srv.Close()
|
|
||||||
|
|
||||||
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, repo, broker, slog.Default())
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
done := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
client.Start(ctx)
|
|
||||||
close(done)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Wait for initial sync to complete by checking repo state.
|
|
||||||
// The agents should be persisted from the RPC responses.
|
|
||||||
deadline := time.Now().Add(5 * time.Second)
|
|
||||||
for time.Now().Before(deadline) {
|
|
||||||
repo.mu.Lock()
|
|
||||||
_, ottoOK := repo.agents["otto"]
|
|
||||||
_, dexOK := repo.agents["dex"]
|
|
||||||
repo.mu.Unlock()
|
|
||||||
if ottoOK && dexOK {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
}
|
|
||||||
|
|
||||||
repo.mu.Lock()
|
|
||||||
_, ottoOK := repo.agents["otto"]
|
|
||||||
_, dexOK := repo.agents["dex"]
|
|
||||||
repo.mu.Unlock()
|
|
||||||
|
|
||||||
if !ottoOK {
|
|
||||||
t.Error("otto not found in repo after initial sync — readLoop may not have been active before Send()")
|
|
||||||
}
|
|
||||||
if !dexOK {
|
|
||||||
t.Error("dex not found in repo after initial sync — readLoop may not have been active before Send()")
|
|
||||||
}
|
|
||||||
|
|
||||||
cancel()
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
case <-time.After(3 * time.Second):
|
|
||||||
t.Fatal("WSClient did not shut down cleanly")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── 7. Test: Event not lost during initial sync (regression) ───────────────
|
|
||||||
|
|
||||||
// TestConnectAndRun_EventNotLostDuringSync verifies that live gateway events
|
|
||||||
// arriving during initial sync are NOT dropped. This is a regression test
|
|
||||||
// for the race where readLoop started before registerEventHandlers(),
|
|
||||||
// causing events read during that window to be logged as "unhandled" and lost.
|
|
||||||
//
|
|
||||||
// The mock server sends a live event (sessions.changed) right after the
|
|
||||||
// handshake, interleaved with the RPC responses for agents.list and
|
|
||||||
// sessions.list. The test asserts the event is received by the handler.
|
|
||||||
func TestConnectAndRun_EventNotLostDuringSync(t *testing.T) {
|
|
||||||
repo := &mockAgentRepo{agents: make(map[string]models.AgentCardData)}
|
|
||||||
broker := handler.NewBroker()
|
|
||||||
capture := newBroadcastCapture(broker)
|
|
||||||
defer capture.close()
|
|
||||||
|
|
||||||
// Pre-seed an agent so the event handler can update it.
|
|
||||||
repo.agents["otto"] = models.AgentCardData{
|
|
||||||
ID: "otto",
|
|
||||||
DisplayName: "Otto",
|
|
||||||
Status: models.AgentStatusIdle,
|
|
||||||
}
|
|
||||||
|
|
||||||
srv := newTestWSServer(t, func(conn *websocket.Conn) {
|
|
||||||
// Handshake
|
|
||||||
handleHandshake(t, conn)
|
|
||||||
|
|
||||||
// After handshake, process RPCs and inject a live event.
|
|
||||||
for {
|
|
||||||
var req map[string]any
|
|
||||||
if err := conn.ReadJSON(&req); err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
reqID, _ := req["id"].(string)
|
|
||||||
method, _ := req["method"].(string)
|
|
||||||
|
|
||||||
// Respond to agents.list RPC
|
|
||||||
if method == "agents.list" {
|
|
||||||
// Before responding, inject a live event — simulates
|
|
||||||
// a gateway pushing a presence update during sync.
|
|
||||||
evt := map[string]any{
|
|
||||||
"type": "event",
|
|
||||||
"event": "presence",
|
|
||||||
"params": map[string]any{"agentId": "otto", "connected": true, "lastActivityAt": "2025-05-20T12:30:00Z"},
|
|
||||||
}
|
|
||||||
if err := conn.WriteJSON(evt); err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now send the RPC response
|
|
||||||
res := map[string]any{
|
|
||||||
"type": "res",
|
|
||||||
"id": reqID,
|
|
||||||
"ok": true,
|
|
||||||
"result": []map[string]any{
|
|
||||||
{"id": "otto", "name": "Otto", "role": "Orchestrator", "channel": "discord"},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
if err := conn.WriteJSON(res); err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Respond to sessions.list RPC
|
|
||||||
if method == "sessions.list" {
|
|
||||||
res := map[string]any{
|
|
||||||
"type": "res",
|
|
||||||
"id": reqID,
|
|
||||||
"ok": true,
|
|
||||||
"result": []map[string]any{},
|
|
||||||
}
|
|
||||||
if err := conn.WriteJSON(res); err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Default response for other methods
|
|
||||||
res := map[string]any{
|
|
||||||
"type": "res",
|
|
||||||
"id": reqID,
|
|
||||||
"ok": true,
|
|
||||||
"result": map[string]any{},
|
|
||||||
}
|
|
||||||
if err := conn.WriteJSON(res); err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
defer srv.Close()
|
|
||||||
|
|
||||||
client := NewWSClient(WSConfig{URL: wsURL(srv), AuthToken: "test-token"}, repo, broker, slog.Default())
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
done := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
client.Start(ctx)
|
|
||||||
close(done)
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Wait for the presence event to be processed by checking the repo.
|
|
||||||
// The presence handler updates the agent, so we check for the
|
|
||||||
// lastActivityAt change.
|
|
||||||
deadline := time.Now().Add(5 * time.Second)
|
|
||||||
var lastActivity string
|
|
||||||
for time.Now().Before(deadline) {
|
|
||||||
repo.mu.Lock()
|
|
||||||
if a, ok := repo.agents["otto"]; ok {
|
|
||||||
lastActivity = a.LastActivity
|
|
||||||
}
|
|
||||||
repo.mu.Unlock()
|
|
||||||
if lastActivity == "2025-05-20T12:30:00Z" {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
}
|
|
||||||
|
|
||||||
if lastActivity != "2025-05-20T12:30:00Z" {
|
|
||||||
t.Errorf("presence event during sync was lost: lastActivity = %q, want %q", lastActivity, "2025-05-20T12:30:00Z")
|
|
||||||
}
|
|
||||||
|
|
||||||
cancel()
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
case <-time.After(3 * time.Second):
|
|
||||||
t.Fatal("WSClient did not shut down cleanly")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestStrPtr(t *testing.T) {
|
func TestStrPtr(t *testing.T) {
|
||||||
s := "hello"
|
s := "hello"
|
||||||
p := strPtr(s)
|
p := strPtr(s)
|
||||||
|
|||||||
Reference in New Issue
Block a user