Initial Commit

This commit is contained in:
2026-06-23 23:54:37 +00:00
commit 3c9742ff87
38 changed files with 4792 additions and 0 deletions
+14
View File
@@ -0,0 +1,14 @@
"""ctxd — Context Daemon."""
from .cli import cli_entry, build_parser
from .config import CtxConfig
from . import db
def daemon_entry():
"""Direct entry point for ctxd (starts server)."""
import sys
from .server import serve_sync
cfg = CtxConfig.from_home()
if not cfg.db_path.exists():
print("Not initialized. Run 'ctx init' first.")
sys.exit(1)
serve_sync(cfg)
+18
View File
@@ -0,0 +1,18 @@
#!/usr/bin/env python3
"""ctxd module entry point — CLI when args given, daemon otherwise."""
import sys
if len(sys.argv) > 1 and sys.argv[1] in ('init', 'serve', 'project-list', 'project-create',
'read', 'cat', 'edit', 'search', 'sync', 'audit',
'user-list', 'user-create', 'import-vault'):
from ctxd.cli import cli_entry
cli_entry()
else:
from ctxd.config import CtxConfig
from ctxd.server import serve_sync
cfg = CtxConfig.from_home()
if not cfg.db_path.exists():
print("Not initialized. Run 'ctx init' first.", file=sys.stderr)
sys.exit(1)
serve_sync(cfg)
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+501
View File
@@ -0,0 +1,501 @@
"""
dossier — CLI for Context Dossier.
"""
import argparse
import json
import os
import sys
import textwrap
from pathlib import Path
from . import db as _db
from .config import CtxConfig, DEFAULT_HOME
_WELCOME_CONTENT = """# Welcome
Welcome to ctxd! This is the shared context store for your projects.
## Getting Started
1. **Create a project:** `ctx project-create my-project --name "My Project"`
2. **Edit context:** `ctx edit my-project`
3. **Start the daemon:** `ctx serve`
4. **Sync to repo:** `ctx sync my-project /path/to/project/root`
## Commands
| Command | Description |
|---|---|
| `ctx init` | Initialize the context store |
| `ctx serve` | Start the daemon |
| `ctx edit <proj>` | Edit project context |
| `ctx read <proj>` | Read project context |
| `ctx sync <proj> [path]` | Sync to project root |
| `ctx search <query>` | Full-text search |
| `ctx project-list` | List all projects |
| `ctx audit` | Show audit log |
| `ctx user-list` | List users |
"""
_REMOTE_RIG_CONTENT = """# RemoteRig
Multi-camera remote monitoring system.
## Architecture
- **Go backend** for the hub/controller app
- **ESP32-C6 camera nodes** (Seeed Studio XIAO ESP32-C6)
- **Pi Zero 2 W** display hub with 10.1-inch touchscreen
- **Home Assistant** integration via Frigate
## Key Decisions
- Go for production code, Python for diagnostics and hardware validation
- Code in Gitea, hardware/design in Notion, CAD artifacts on Seafile
- Enclosure preserves larger case size for wiring clearance
## Conventions
- Development branches off `dev`, PRs target `dev` never `main`
- All feature work tracked in Linear (team `CUB`)
"""
def _seed_context(conn):
"""Insert seed project context with real newlines."""
conn.execute(
"INSERT OR IGNORE INTO project_context (project_id, content, version, updated_by) VALUES (?, ?, 0, 'system')",
('welcome', _WELCOME_CONTENT),
)
conn.execute(
"INSERT OR IGNORE INTO project_context (project_id, content, version, updated_by) VALUES (?, ?, 0, 'system')",
('remote-rig', _REMOTE_RIG_CONTENT),
)
def cmd_init(args):
"""Initialize ~/.ctx/ with database and seed data."""
home = Path(args.home or DEFAULT_HOME).resolve()
cfg = CtxConfig(home=home)
if cfg.db_path.exists():
print(f"ctxd already initialized at {home}")
print(f" DB: {cfg.db_path}")
print("Run 'ctx serve' to start the daemon.")
return
conn = _db.init_db(cfg)
cfg.save()
# Seed project context with real newlines (SQL strings can't do this)
_seed_context(conn)
conn.commit()
conn.close()
print(f"✓ ctxd initialized at {home}")
print(f" Database: {cfg.db_path}")
print(f" Config: {cfg.config_path}")
print(f" Projects: {cfg.projects_dir}")
print(f" Users: admin (admin), hermes-gateway (service)")
print(f" Projects: welcome, remote-rig (seed)")
print()
print("Next steps:")
print(" 1. Edit project context: ctx edit <project>")
print(" 2. Start the daemon: ctx serve")
print(" 3. Set sync path: ctx sync <project> <path>")
def cmd_serve(args):
"""Start the ctxd daemon."""
cfg = CtxConfig.from_home(args.home)
if not cfg.db_path.exists():
print("Not initialized. Run 'ctx init' first.")
sys.exit(1)
from .server import serve_sync
serve_sync(cfg)
def cmd_project_list(args):
"""List all projects."""
conn = _db.init_db(CtxConfig.from_home(args.home))
projects = _db.project_list(conn)
conn.close()
if not projects:
print("No projects found.")
return
print(f"{'PROJECT ID':<20} {'DISPLAY NAME':<25} {'VERSION':>8} SYNC PATH")
print("-" * 80)
for p in projects:
print(f"{p['project_id']:<20} {p['display_name']:<25} {p['shared_version']:>8} {p.get('sync_path', '') or '-'}")
def cmd_project_create(args):
"""Create a new project."""
conn = _db.init_db(CtxConfig.from_home(args.home))
try:
_db.project_create(conn, args.project_id, args.name or args.project_id, args.description or "")
_db.audit_log(conn, "admin", "create", f"Created project {args.project_id}",
project_id=args.project_id, entity_type="project", entity_id=args.project_id)
conn.commit()
print(f"✓ Project '{args.project_id}' created.")
except Exception as e:
conn.rollback()
print(f"Error: {e}")
finally:
conn.close()
def cmd_context_read(args):
"""Read project context to stdout."""
conn = _db.init_db(CtxConfig.from_home(args.home))
ctx = _db.context_read(conn, args.project_id)
conn.close()
if ctx is None:
print(f"Project '{args.project_id}' not found.")
sys.exit(1)
print(f"--- Project: {args.project_id} (v{ctx['version']}) ---")
print(ctx["content"])
def cmd_context_edit(args):
"""Edit project context in $EDITOR, then auto-merge."""
cfg = CtxConfig.from_home(args.home)
conn = _db.init_db(cfg)
ctx = _db.context_read(conn, args.project_id)
if ctx is None:
print(f"Project '{args.project_id}' not found.")
conn.close()
sys.exit(1)
# Fork workspace
user_id = args.as_user or "admin"
fork = _db.workspace_fork(conn, user_id, args.project_id)
if not fork["ok"]:
print(f"Error forking workspace: {fork.get('error')}")
conn.close()
sys.exit(1)
ws_id = fork["workspace_id"]
# Write current content to tempfile
import tempfile
content = ctx["content"]
with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as tf:
tf.write(content)
temp_path = tf.name
editor = os.environ.get("EDITOR", "vim")
# Determine how to get output back
# If EDITOR ends with 'code -w', handle that
exit_code = os.system(f"{editor} {temp_path}")
if exit_code != 0:
print(f"Editor exited with code {exit_code}, aborting.")
os.unlink(temp_path)
conn.close()
sys.exit(1)
# Read edited content
with open(temp_path) as f:
new_content = f.read()
os.unlink(temp_path)
if new_content == content:
print("No changes. Abandoning workspace.")
_db.workspace_abandon(conn, ws_id)
conn.commit()
conn.close()
return
# Write back to workspace
_db.workspace_write_file(conn, ws_id, new_content)
conn.commit()
# Submit and auto-merge (admin role auto-merges)
result = _db.workspace_submit(conn, ws_id, user_id,
diff_summary=f"Edited via ctx edit {args.project_id}")
if result.get("ok"):
if result.get("action") == "merged":
print(f"✓ Context updated to v{result['new_version']}.")
# Auto-sync if sync_path configured
sync = _db.sync_to_project(conn, args.project_id)
if sync.get("ok"):
print(f" Synced to: {sync['path']}")
conn.commit()
else:
print(f"✓ Submitted for review: {result.get('request_id')}")
conn.commit()
else:
if result.get("error") == "conflict":
print("Conflict: the shared context changed while you were editing.")
print(f" Your base: v{fork['base_version']}, Current: v{result['current_version']}")
print(" Re-read the current version, re-apply your edits, and try again.")
else:
print(f"Error: {result.get('error')}")
conn.rollback()
conn.close()
def cmd_search(args):
"""Full-text search across all context."""
conn = _db.init_db(CtxConfig.from_home(args.home))
results = _db.search(conn, args.query, limit=args.limit)
conn.close()
if not results:
print("No results.")
return
for r in results:
src = r.get("source_type", "?")
pid = r.get("project_id", "?")
path = r.get("file_path", "?")
snippet = (r.get("content") or "")[:200].replace("\n", " ").strip()
print(f"[{src:16s}] {pid}/{path}")
print(f" {snippet}...")
print()
def cmd_sync(args):
"""Sync context to project root (AGENTS.md + symlinks)."""
cfg = CtxConfig.from_home(args.home)
conn = _db.init_db(cfg)
if args.path:
# Set sync path
_db.project_set_sync_path(conn, args.project_id, args.path)
conn.commit()
print(f"✓ Sync path set for '{args.project_id}': {args.path}")
result = _db.sync_to_project(conn, args.project_id)
conn.close()
if result.get("ok"):
print(f"✓ Synced to: {result['path']}")
else:
print(f"Error: {result.get('error')}")
def cmd_audit(args):
"""Show recent audit log entries."""
conn = _db.init_db(CtxConfig.from_home(args.home))
rows = _db.audit_query(conn, limit=args.limit)
conn.close()
if not rows:
print("No audit entries.")
return
print(f"{'TIMESTAMP':<22} {'USER':<16} {'AGENT':<16} {'OPERATION':<10} SUMMARY")
print("-" * 110)
for r in rows:
print(f"{r['created_at']:<22} {r['user_id']:<16} {r['agent_id']:<16} "
f"{r['operation']:<10} {(r['summary'] or '')[:50]}")
def cmd_user_list(args):
"""List users."""
conn = _db.init_db(CtxConfig.from_home(args.home))
users = _db.user_list(conn)
conn.close()
if not users:
print("No users.")
return
for u in users:
print(f"{u['user_id']:<20} {u['display_name']:<25} {u['role']}")
def cmd_user_create(args):
"""Create a new user."""
conn = _db.init_db(CtxConfig.from_home(args.home))
try:
_db.user_create(conn, args.user_id, args.display_name, args.role)
conn.commit()
print(f"✓ User '{args.user_id}' created.")
except Exception as e:
conn.rollback()
print(f"Error: {e}")
finally:
conn.close()
def cmd_import_vault(args):
"""Import context from an existing vault (e.g., OpenClawVault)."""
cfg = CtxConfig.from_home(args.home)
vault = Path(args.vault_path)
if not vault.exists():
print(f"Vault path does not exist: {vault}")
sys.exit(1)
conn = _db.init_db(cfg)
# Discover existing project context files in the vault
known_projects = {
"remote-rig": "RemoteRig",
"hermes-agent": "Hermes Agent",
"extrudex": "Extrudex",
"lasercat": "LaserCat",
"tracehound": "Tracehound",
"home-assistant": "Home Assistant",
"openclaw-control-center": "OpenClaw Control Center",
}
# Check for project directories in 02_Projects/
projects_dir = vault / "02_Projects"
imported = 0
if projects_dir.exists():
for child in sorted(projects_dir.iterdir()):
if not child.is_dir() and child.suffix == ".md":
# Single markdown file
pid = child.stem.lower().replace(" ", "-")
name = child.stem
content = child.read_text()
_ensure_project(conn, pid, name, content)
imported += 1
elif child.is_dir():
# Directory of files
pid = child.name.lower().replace(" ", "-")
name = child.name.replace("_", " ").replace("-", " ").title()
context_file = child / "context.md"
if context_file.exists():
content = context_file.read_text()
_ensure_project(conn, pid, name, content)
imported += 1
else:
# Concatenate all markdown files
parts = []
for md in sorted(child.glob("*.md")):
parts.append(f"<!-- from {md.name} -->\n\n{md.read_text()}")
if parts:
_ensure_project(conn, pid, name, "\n\n---\n\n".join(parts))
imported += 1
# Also check 06_Knowledge for user profile
knowledge_dir = vault / "06_Knowledge"
if knowledge_dir:
joshua_ctx = knowledge_dir / "Joshua Context.md"
if joshua_ctx.exists():
content = joshua_ctx.read_text()
# Set as admin profile
existing = _db.profile_read(conn, "admin")
if existing and existing["content"]:
pass # already has profile
else:
_db.profile_update(conn, "admin", content, base_version=0)
print(f" Imported user profile: Joshua Context.md")
conn.commit()
conn.close()
print(f"✓ Imported {imported} project(s) from {vault}")
def _ensure_project(conn, project_id, name, content):
"""Create project + write context if not already present."""
existing = _db.project_get(conn, project_id)
if existing:
print(f" Skipped (already exists): {project_id}")
else:
_db.project_create(conn, project_id, name)
_db.context_update(conn, project_id, content, "admin", base_version=0)
print(f" Imported: {project_id} ({name})")
# ── CLI dispatcher ────────────────────────────────────────────────────────────
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
prog="dossier",
description="Context Dossier — single source of truth for multi-harness project context",
)
p.add_argument("--home", help="Override ~/.ctx home directory")
sub = p.add_subparsers(dest="command", required=True)
# init
sp = sub.add_parser("init", help="Initialize ~/.ctx/ with database and seed data")
sp.set_defaults(func=cmd_init)
sp.add_argument("--home")
# serve
sp = sub.add_parser("serve", help="Start the ctxd daemon")
sp.set_defaults(func=cmd_serve)
sp.add_argument("--home")
# project list
sp = sub.add_parser("project-list", help="List projects")
sp.set_defaults(func=cmd_project_list)
sp.add_argument("--home")
# project create
sp = sub.add_parser("project-create", help="Create a new project")
sp.set_defaults(func=cmd_project_create)
sp.add_argument("project_id")
sp.add_argument("--name", "-n", help="Display name (defaults to project_id)")
sp.add_argument("--description", "-d", default="")
sp.add_argument("--home")
# context read
sp = sub.add_parser("read", aliases=["cat"], help="Read project context")
sp.set_defaults(func=cmd_context_read)
sp.add_argument("project_id")
sp.add_argument("--home")
# context edit
sp = sub.add_parser("edit", help="Edit project context in $EDITOR")
sp.set_defaults(func=cmd_context_edit)
sp.add_argument("project_id")
sp.add_argument("--as-user", "-u", default="admin", help="Edit as this user")
sp.add_argument("--home")
# search
sp = sub.add_parser("search", help="Full-text search across context")
sp.set_defaults(func=cmd_search)
sp.add_argument("query")
sp.add_argument("--limit", "-l", type=int, default=10)
sp.add_argument("--home")
# sync
sp = sub.add_parser("sync", help="Sync context to project AGENTS.md")
sp.set_defaults(func=cmd_sync)
sp.add_argument("project_id")
sp.add_argument("path", nargs="?", help="Set sync path (absolute)")
sp.add_argument("--home")
# audit
sp = sub.add_parser("audit", help="Show recent audit log")
sp.set_defaults(func=cmd_audit)
sp.add_argument("--limit", "-l", type=int, default=20)
sp.add_argument("--home")
# user list
sp = sub.add_parser("user-list", help="List users")
sp.set_defaults(func=cmd_user_list)
sp.add_argument("--home")
# user create
sp = sub.add_parser("user-create", help="Create a new user")
sp.set_defaults(func=cmd_user_create)
sp.add_argument("user_id")
sp.add_argument("--display-name", "-n", required=True)
sp.add_argument("--role", "-r", default="contributor", choices=["admin", "contributor", "service"])
sp.add_argument("--home")
# import-vault
sp = sub.add_parser("import-vault", help="Import context from Obsidian vault")
sp.set_defaults(func=cmd_import_vault)
sp.add_argument("vault_path", help="Path to vault directory (e.g., ~/OpenClawVault)")
sp.add_argument("--home")
return p
def cli_entry():
parser = build_parser()
args = parser.parse_args()
if hasattr(args, "func"):
args.func(args)
else:
parser.print_help()
if __name__ == "__main__":
cli_entry()
+111
View File
@@ -0,0 +1,111 @@
"""
Configuration for ctxd — context daemon.
"""
import os
from pathlib import Path
# Default home directory (~/.ctx) — overridable via CTXD_HOME env var
DEFAULT_HOME = Path(os.environ.get("CTXD_HOME", Path.home() / ".ctx"))
# Defaults for ctxd.yaml
DEFAULT_CONFIG = {
"server": {
"host": "0.0.0.0",
"port": 9091,
},
"snapshots": {
"min_keep": 5,
"max_keep": 25,
},
"auth": {
"enabled": False,
"api_key": "",
},
"seed": {
"admin_user": "admin",
"admin_display": "Administrator",
"service_user": "hermes-gateway",
"service_display": "Hermes Agent",
},
}
class CtxConfig:
"""Holds resolved paths and config for a ctxd runtime."""
def __init__(self, home: Path | str | None = None, config: dict | None = None):
resolved = Path(home) if home else DEFAULT_HOME
self.home = resolved.resolve()
self._cfg = config or dict(DEFAULT_CONFIG)
# ── Directory layout ──────────────────────────────────────────
@property
def db_path(self) -> Path:
return self.home / "ctxd.db"
@property
def snapshots_dir(self) -> Path:
return self.home / "snapshots"
@property
def projects_dir(self) -> Path:
return self.home / "projects"
@property
def user_dir(self) -> Path:
return self.home / "users"
@property
def config_path(self) -> Path:
return self.home / "ctxd.yaml"
# ── Config accessors ──────────────────────────────────────────
@property
def host(self) -> str:
return self._cfg.get("server", {}).get("host", "127.0.0.1")
@property
def port(self) -> int:
return self._cfg.get("server", {}).get("port", 9091)
@property
def min_snapshots(self) -> int:
return self._cfg.get("snapshots", {}).get("min_keep", 5)
@property
def max_snapshots(self) -> int:
return self._cfg.get("snapshots", {}).get("max_keep", 25)
# ── Auth ──────────────────────────────────────────────────────
@property
def auth_enabled(self) -> bool:
return self._cfg.get("auth", {}).get("enabled", False)
@property
def api_key(self) -> str:
return self._cfg.get("auth", {}).get("api_key", "")
# ── Bootstrap ─────────────────────────────────────────────────
def ensure_dirs(self):
"""Create all required directories if they don't exist."""
for d in [self.home, self.snapshots_dir, self.projects_dir, self.user_dir]:
d.mkdir(parents=True, exist_ok=True)
@classmethod
def from_home(cls, home: Path | str | None = None) -> "CtxConfig":
"""Load from ctxd.yaml if it exists, otherwise use defaults."""
home = Path(home).resolve() if home else DEFAULT_HOME
cfg_path = home / "ctxd.yaml"
if cfg_path.exists():
import yaml
with open(cfg_path) as f:
data = yaml.safe_load(f) or {}
return cls(home=str(home), config=data)
return cls(home=str(home))
def save(self):
"""Write config to ctxd.yaml."""
import yaml
self.ensure_dirs()
with open(self.config_path, "w") as f:
yaml.dump(self._cfg, f, default_flow_style=False, sort_keys=False)
+111
View File
@@ -0,0 +1,111 @@
35;13;39M35;18;41M35;22;43M35;25;44M35;32;46M35;33;47M35;36;48M35;38;49M0;38;49M0;38;49m65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M65;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M64;38;49M0;38;49M0;38;49m35;37;48M35;36;48M35;36;47M35;34;46M35;33;45M35;32;43M35;32;42M35;32;41M35;32;40M35;32;37M35;35;36M35;39;35M35;45;34M35;53;34M35;57;34M35;64;35M35;70;36M35;72;36M35;73;37M35;74;37M"""
Configuration for ctxd — context daemon.
"""
import os
from pathlib import Path
# Default home directory (~/.ctx) — overridable via CTXD_HOME env var
DEFAULT_HOME = Path(os.environ.get("CTXD_HOME", Path.home() / ".ctx"))
# Defaults for ctxd.yaml
DEFAULT_CONFIG = {
"server": {
"host": "0.0.0.0",
"port": 9091,
},
"snapshots": {
"min_keep": 5,
"max_keep": 25,
},
"auth": {
"enabled": False,
"api_key": "",
},
"seed": {
"admin_user": "admin",
"admin_display": "Administrator",
"service_user": "hermes-gateway",
"service_display": "Hermes Agent",
},
}
class CtxConfig:
"""Holds resolved paths and config for a ctxd runtime."""
def __init__(self, home: Path | str | None = None, config: dict | None = None):
resolved = Path(home) if home else DEFAULT_HOME
self.home = resolved.resolve()
self._cfg = config or dict(DEFAULT_CONFIG)
# ── Directory layout ──────────────────────────────────────────
@property
def db_path(self) -> Path:
return self.home / "ctxd.db"
@property
def snapshots_dir(self) -> Path:
return self.home / "snapshots"
@property
def projects_dir(self) -> Path:
return self.home / "projects"
@property
def user_dir(self) -> Path:
return self.home / "users"
@property
def config_path(self) -> Path:
return self.home / "ctxd.yaml"
# ── Config accessors ──────────────────────────────────────────
@property
def host(self) -> str:
return self._cfg.get("server", {}).get("host", "127.0.0.1")
@property
def port(self) -> int:
return self._cfg.get("server", {}).get("port", 9091)
@property
def min_snapshots(self) -> int:
return self._cfg.get("snapshots", {}).get("min_keep", 5)
@property
def max_snapshots(self) -> int:
return self._cfg.get("snapshots", {}).get("max_keep", 25)
# ── Auth ──────────────────────────────────────────────────────
@property
def auth_enabled(self) -> bool:
return self._cfg.get("auth", {}).get("enabled", False)
@property
def api_key(self) -> str:
return self._cfg.get("auth", {}).get("api_key", "")
# ── Bootstrap ─────────────────────────────────────────────────
def ensure_dirs(self):
"""Create all required directories if they don't exist."""
for d in [self.home, self.snapshots_dir, self.projects_dir, self.user_dir]:
d.mkdir(parents=True, exist_ok=True)
@classmethod
def from_home(cls, home: Path | str | None = None) -> "CtxConfig":
"""Load from ctxd.yaml if it exists, otherwise use defaults."""
home = Path(home).resolve() if home else DEFAULT_HOME
cfg_path = home / "ctxd.yaml"
if cfg_path.exists():
import yaml
with open(cfg_path) as f:
data = yaml.safe_load(f) or {}
return cls(home=str(home), config=data)
return cls(home=str(home))
def save(self):
"""Write config to ctxd.yaml."""
import yaml
self.ensure_dirs()
with open(self.config_path, "w") as f:
yaml.dump(self._cfg, f, default_flow_style=False, sort_keys=False)
+906
View File
@@ -0,0 +1,906 @@
"""
Database layer for ctxd — schema init, CRUD, workspace fork/merge, FTS, audit.
All public methods take a sqlite3.Connection as the first argument so callers
control transactions. This module is stateless — all state is in SQLite.
"""
import json
import sqlite3
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from .config import CtxConfig
# ── Schema ────────────────────────────────────────────────────────────────────
SCHEMA_PATH = Path(__file__).parent / "schema.sql"
def init_db(cfg: CtxConfig) -> sqlite3.Connection:
"""Create ~/.ctx/ dirs + initialize the database from schema.sql."""
cfg.ensure_dirs()
fresh = not cfg.db_path.exists()
conn = sqlite3.connect(str(cfg.db_path))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode = WAL")
conn.execute("PRAGMA foreign_keys = ON")
if fresh:
with open(SCHEMA_PATH) as f:
conn.executescript(f.read())
else:
# Migration: add metadata_tags column if it doesn't exist
try:
conn.execute("ALTER TABLE projects ADD COLUMN metadata_tags TEXT DEFAULT '[]'")
conn.commit()
except sqlite3.OperationalError:
pass # column already exists
return conn
def now() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
# ── Helpers ───────────────────────────────────────────────────────────────────
def _row_to_dict(row: sqlite3.Row | None) -> dict | None:
if row is None:
return None
return dict(row)
# ── Users ─────────────────────────────────────────────────────────────────────
def user_create(conn, user_id: str, display_name: str, role: str = "contributor"):
conn.execute(
"INSERT INTO users (user_id, display_name, role) VALUES (?, ?, ?)",
(user_id, display_name, role),
)
def user_get(conn, user_id: str) -> dict | None:
return _row_to_dict(conn.execute(
"SELECT * FROM users WHERE user_id = ?", (user_id,)
).fetchone())
def user_list(conn) -> list[dict]:
return [dict(r) for r in conn.execute("SELECT * FROM users ORDER BY user_id").fetchall()]
# ── Projects ──────────────────────────────────────────────────────────────────
def project_create(conn, project_id: str, display_name: str, description: str = ""):
conn.execute(
"INSERT INTO projects (project_id, display_name, description) VALUES (?, ?, ?)",
(project_id, display_name, description),
)
# Also create empty shared context
conn.execute(
"INSERT INTO project_context (project_id, content, version) VALUES (?, '', 0)",
(project_id,),
)
def project_get(conn, project_id: str) -> dict | None:
return _row_to_dict(conn.execute(
"SELECT * FROM projects WHERE project_id = ?", (project_id,)
).fetchone())
def project_list(conn) -> list[dict]:
return [dict(r) for r in conn.execute(
"SELECT project_id, display_name, description, shared_version FROM projects ORDER BY project_id"
).fetchall()]
def project_set_sync_path(conn, project_id: str, sync_path: str | None):
conn.execute(
"UPDATE projects SET sync_path = ?, auto_sync = 1 WHERE project_id = ?",
(sync_path, project_id),
)
def project_get_tags(conn, project_id: str) -> list[str]:
"""Get project metadata tags as a list of strings."""
row = conn.execute(
"SELECT metadata_tags FROM projects WHERE project_id = ?", (project_id,)
).fetchone()
if row is None:
return []
tags = row["metadata_tags"]
if not tags:
return []
import json
try:
return json.loads(tags)
except (json.JSONDecodeError, TypeError):
return []
def project_set_tags(conn, project_id: str, tags: list[str]):
"""Set project metadata tags from a list of strings."""
import json
conn.execute(
"UPDATE projects SET metadata_tags = ? WHERE project_id = ?",
(json.dumps(tags), project_id),
)
# ── Metadata header ──────────────────────────────────────────────────────────
def build_metadata_header(project_id: str, display_name: str | None = None,
version: int = 0, updated_at: str | None = None,
tags: list[str] | None = None) -> str:
"""Build a metadata header block matching the vault YAML frontmatter format.
TYPE: PROJECT CONTEXT, PROJECT, STATUS: ACTIVE, LAST-UPDATED, TAGS.
LAST-UPDATED uses the actual updated_at timestamp, falling back to today.
TAGS uses the project's metadata_tags if provided, falling back to project name + CONTEXT."""
from datetime import datetime, timezone
project_upper = (display_name or project_id).upper()
last_updated = (updated_at or datetime.now(timezone.utc).strftime("%Y-%m-%d"))
if "T" in last_updated:
last_updated = last_updated.split("T")[0]
tags_list = tags or [project_upper, "CONTEXT"]
tags_block = "\n".join(f" - {t}" for t in tags_list)
return f"""TYPE: PROJECT CONTEXT
PROJECT: {project_upper}
STATUS: ACTIVE
LAST-UPDATED: {last_updated}
TAGS:
{tags_block}
--
___
"""
METADATA_HEADER_PATTERN = (
r"^(TYPE: PROJECT CONTEXT\n"
r"PROJECT: .+\n"
r"STATUS: .+\n"
r"LAST-UPDATED: .+\n"
r"TAGS:\n"
r"(?: {2,4}- .+\n)*"
r"\n*"
r"(?:(?:--\n___)|---)\n"
r"\n?)"
)
def strip_metadata_header(content: str) -> str:
"""Remove the metadata header block if present at the start of content.
Returns the content without the header."""
import re
stripped = re.sub(METADATA_HEADER_PATTERN, "", content, count=1)
return stripped
# ── Shared Context ────────────────────────────────────────────────────────────
def context_read(conn, project_id: str) -> dict | None:
"""Read the current shared context (the compiled markdown).
Returns with metadata header prepended dynamically.
If content already has a header (including YAML frontmatter from vault imports),
it is replaced with the current dynamic header."""
row = conn.execute(
"SELECT pc.*, p.shared_version, p.display_name FROM project_context pc "
"JOIN projects p ON p.project_id = pc.project_id "
"WHERE pc.project_id = ?", (project_id,)
).fetchone()
if row is None:
return None
result = _row_to_dict(row)
content = result["content"]
# Check if content already starts with a header pattern (ours or generic YAML)
import re
existing_header = re.match(METADATA_HEADER_PATTERN, content, flags=re.DOTALL)
# Get project tags
project_tags = project_get_tags(conn, project_id)
header = build_metadata_header(
project_id=project_id,
display_name=result.get("display_name"),
version=result.get("shared_version", result.get("version", 0)),
updated_at=result.get("updated_at"),
tags=project_tags or None,
)
if existing_header:
# Replace existing header with fresh one
content = header + content[existing_header.end():]
else:
# Strip any leading content (HTML comments, blank lines) followed by
# YAML frontmatter (---\n...\n---\n) — common in vault imports
body = content.lstrip("\n\r ")
# Remove leading HTML comments
body = re.sub(r"^(<!--.*?-->\s*\n*)*", "", body, flags=re.DOTALL)
# Remove YAML frontmatter
body = re.sub(r"^---\s*\n.*?\n---\s*\n?", "", body, flags=re.DOTALL)
# Remove any leading comments again (in case YAML was inside a comment wrapper)
body = re.sub(r"^(<!--.*?-->\s*\n*)*", "", body, flags=re.DOTALL)
body = body.strip()
content = header + body
result["content"] = content
return result
def context_update(conn, project_id: str, new_content: str, updated_by: str,
base_version: int) -> dict:
"""
Update shared context with optimistic concurrency.
Returns {'ok': True, 'new_version': N} or {'ok': False, 'error': 'conflict',
'current_version': N}.
"""
cur = conn.execute(
"SELECT shared_version FROM projects WHERE project_id = ?",
(project_id,)
)
row = cur.fetchone()
if row is None:
return {"ok": False, "error": "not_found"}
current_version = row["shared_version"]
if base_version != current_version:
return {
"ok": False, "error": "conflict",
"current_version": current_version,
}
new_version = current_version + 1
# Strip any metadata header that may have been edited-in from the UI
clean_content = strip_metadata_header(new_content)
# Snapshot current state first
_snapshot_take(conn, project_id, version_from=current_version, version_to=new_version)
# Update project_context
conn.execute(
"UPDATE project_context SET content = ?, version = ?, updated_by = ?, updated_at = ? "
"WHERE project_id = ?",
(clean_content, new_version, updated_by, now(), project_id),
)
# Bump shared version
conn.execute(
"UPDATE projects SET shared_version = ? WHERE project_id = ?",
(new_version, project_id),
)
return {"ok": True, "new_version": new_version, "content": clean_content}
# ── User Profile ──────────────────────────────────────────────────────────────
def profile_read(conn, user_id: str) -> dict | None:
return _row_to_dict(conn.execute(
"SELECT * FROM user_profiles WHERE user_id = ?", (user_id,)
).fetchone())
def profile_update(conn, user_id: str, content: str, base_version: int) -> dict:
cur = conn.execute(
"SELECT version FROM user_profiles WHERE user_id = ?", (user_id,)
)
row = cur.fetchone()
if row is None:
# Create
conn.execute(
"INSERT INTO user_profiles (user_id, content, version) VALUES (?, ?, 1)",
(user_id, content),
)
return {"ok": True, "new_version": 1}
current_version = row["version"]
if base_version != current_version:
return {"ok": False, "error": "conflict", "current_version": current_version}
conn.execute(
"UPDATE user_profiles SET content = ?, version = ?, updated_at = ? WHERE user_id = ?",
(content, current_version + 1, now(), user_id),
)
return {"ok": True, "new_version": current_version + 1}
# ── Workspace (fork / submit / merge) ─────────────────────────────────────────
def workspace_fork(conn, user_id: str, project_id: str) -> dict:
"""
Create a personal workspace forking the current shared state.
Returns {'ok': True, 'workspace_id': ..., 'base_version': ...}
or {'ok': False, 'error': ...}.
"""
proj = project_get(conn, project_id)
if proj is None:
return {"ok": False, "error": "project_not_found"}
base_version = proj["shared_version"]
# Read current shared context content to seed the workspace
ctx = context_read(conn, project_id)
shared_content = ctx["content"] if ctx else ""
ws_id = str(uuid.uuid4())
conn.execute(
"INSERT INTO user_workspaces (workspace_id, user_id, project_id, base_version) "
"VALUES (?, ?, ?, ?)",
(ws_id, user_id, project_id, base_version),
)
# Seed workspace with current shared content
conn.execute(
"INSERT INTO workspace_files (workspace_id, file_path, content) VALUES (?, 'context.md', ?)",
(ws_id, shared_content),
)
return {"ok": True, "workspace_id": ws_id, "base_version": base_version}
def workspace_get(conn, workspace_id: str) -> dict | None:
return _row_to_dict(conn.execute(
"SELECT * FROM user_workspaces WHERE workspace_id = ?", (workspace_id,)
).fetchone())
def workspace_list_for_user(conn, user_id: str, project_id: str | None = None) -> list[dict]:
if project_id:
rows = conn.execute(
"SELECT * FROM user_workspaces WHERE user_id = ? AND project_id = ? ORDER BY created_at DESC",
(user_id, project_id),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM user_workspaces WHERE user_id = ? ORDER BY created_at DESC",
(user_id,),
).fetchall()
return [dict(r) for r in rows]
def workspace_read_file(conn, workspace_id: str, file_path: str = "context.md") -> str | None:
row = conn.execute(
"SELECT content FROM workspace_files WHERE workspace_id = ? AND file_path = ?",
(workspace_id, file_path),
).fetchone()
return row["content"] if row else None
def workspace_write_file(conn, workspace_id: str, content: str,
file_path: str = "context.md"):
existing = conn.execute(
"SELECT 1 FROM workspace_files WHERE workspace_id = ? AND file_path = ?",
(workspace_id, file_path),
).fetchone()
if existing:
conn.execute(
"UPDATE workspace_files SET content = ?, version = version + 1, updated_at = ? "
"WHERE workspace_id = ? AND file_path = ?",
(content, now(), workspace_id, file_path),
)
else:
conn.execute(
"INSERT INTO workspace_files (workspace_id, file_path, content) VALUES (?, ?, ?)",
(workspace_id, file_path, content),
)
def workspace_submit(conn, workspace_id: str, submitted_by: str,
diff_summary: str = "") -> dict:
"""
Submit a workspace for review / direct merge.
If the submitter is admin, the request auto-approves and merges.
Otherwise creates a pending change request.
"""
ws = workspace_get(conn, workspace_id)
if ws is None:
return {"ok": False, "error": "workspace_not_found"}
if ws["status"] != "in_progress":
return {"ok": False, "error": f"workspace is {ws['status']}, not in_progress"}
user = user_get(conn, submitted_by)
if user is None:
return {"ok": False, "error": "user_not_found"}
project = project_get(conn, ws["project_id"])
target_version = project["shared_version"] + 1
base_version = ws["base_version"]
# Check for conflict
if base_version != project["shared_version"]:
return {
"ok": False, "error": "conflict",
"current_version": project["shared_version"],
"base_version": base_version,
}
if user["role"] == "admin":
# Auto-merge
content = workspace_read_file(conn, workspace_id)
if content is None:
return {"ok": False, "error": "workspace_empty"}
result = context_update(
conn, ws["project_id"], content, submitted_by, base_version
)
if not result["ok"]:
return result
conn.execute(
"UPDATE user_workspaces SET status = 'merged' WHERE workspace_id = ?",
(workspace_id,),
)
return {"ok": True, "action": "merged", **result}
else:
# Create pending change request
req_id = str(uuid.uuid4())
conn.execute(
"INSERT INTO change_requests (request_id, workspace_id, project_id, "
"submitted_by, target_version, base_version, diff_summary) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(req_id, workspace_id, ws["project_id"], submitted_by,
target_version, base_version, diff_summary),
)
conn.execute(
"UPDATE user_workspaces SET status = 'submitted' WHERE workspace_id = ?",
(workspace_id,),
)
return {"ok": True, "action": "submitted", "request_id": req_id}
def workspace_abandon(conn, workspace_id: str):
conn.execute(
"UPDATE user_workspaces SET status = 'abandoned' WHERE workspace_id = ?",
(workspace_id,),
)
def change_request_approve(conn, request_id: str, reviewer_id: str,
comments: str = "") -> dict:
"""Approve a change request and merge it into shared context."""
row = conn.execute(
"SELECT cr.*, ws.project_id FROM change_requests cr "
"JOIN user_workspaces ws ON ws.workspace_id = cr.workspace_id "
"WHERE cr.request_id = ?", (request_id,)
).fetchone()
if row is None:
return {"ok": False, "error": "not_found"}
if row["status"] != "pending":
return {"ok": False, "error": f"status is {row['status']}"}
# Record review
conn.execute(
"INSERT INTO reviews (request_id, reviewer_id, decision, comments) VALUES (?, ?, 'approved', ?)",
(request_id, reviewer_id, comments),
)
# Merge
content = workspace_read_file(conn, row["workspace_id"])
if content is None:
return {"ok": False, "error": "workspace_empty"}
result = context_update(
conn, row["project_id"], content, row["submitted_by"], row["base_version"]
)
if not result["ok"]:
return result
conn.execute(
"UPDATE change_requests SET status = 'merged' WHERE request_id = ?",
(request_id,),
)
conn.execute(
"UPDATE user_workspaces SET status = 'merged' WHERE workspace_id = ?",
(row["workspace_id"],),
)
return {"ok": True, "action": "merged", **result}
# ── Snapshots ─────────────────────────────────────────────────────────────────
def _snapshot_take(conn, project_id: str, version_from: int, version_to: int,
user_id: str | None = None, workspace_id: str | None = None):
"""Internal: snapshot current shared context before mutation."""
ctx = context_read(conn, project_id)
if ctx is None:
return
content = ctx["content"] or ""
import hashlib
content_hash = hashlib.sha256(content.encode()).hexdigest()
snap_id = str(uuid.uuid4())
ts = now().replace(":", "-")
storage_rel = f"{project_id}/{ts}__v{version_from}-{version_to}"
conn.execute(
"INSERT INTO snapshots (snapshot_id, project_id, user_id, workspace_id, "
"version_from, version_to, storage_path, content_hash, size_bytes) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(snap_id, project_id, user_id, workspace_id,
version_from, version_to, storage_rel, content_hash, len(content)),
)
def snapshot_list(conn, project_id: str) -> list[dict]:
rows = conn.execute(
"SELECT * FROM snapshots WHERE project_id = ? ORDER BY created_at DESC",
(project_id,),
).fetchall()
return [dict(r) for r in rows]
def snapshot_rotate(conn, project_id: str, max_keep: int = 25, min_keep: int = 5):
"""
Prune excess snapshots for a project, keeping at least min_keep.
Returns count of pruned snapshots.
"""
rows = conn.execute(
"SELECT snapshot_id FROM snapshots WHERE project_id = ? "
"ORDER BY created_at DESC", (project_id,)
).fetchall()
if len(rows) <= max_keep:
return 0
keep = max(min_keep, max_keep)
to_delete = [r["snapshot_id"] for r in rows[keep:]]
for sid in to_delete:
conn.execute("DELETE FROM snapshots WHERE snapshot_id = ?", (sid,))
return len(to_delete)
# ── Audit ─────────────────────────────────────────────────────────────────────
def audit_log(conn, user_id: str, operation: str, summary: str,
agent_id: str = "ctx", project_id: str | None = None,
entity_type: str | None = None, entity_id: str | None = None,
details: dict | None = None):
conn.execute(
"INSERT INTO audit_log (user_id, agent_id, project_id, operation, "
"entity_type, entity_id, summary, details_json) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(user_id, agent_id, project_id, operation,
entity_type, entity_id, summary,
json.dumps(details) if details else None),
)
def audit_query(conn, **filters) -> list[dict]:
"""Flexible audit query. Supports: user_id, project_id, operation, agent_id, limit."""
parts = ["SELECT * FROM audit_log"]
wheres = []
params = []
for col in ("user_id", "project_id", "operation", "agent_id"):
val = filters.get(col)
if val:
wheres.append(f"{col} = ?")
params.append(val)
if wheres:
parts.append("WHERE " + " AND ".join(wheres))
parts.append("ORDER BY entry_id DESC")
limit = filters.get("limit", 50)
parts.append(f"LIMIT {int(limit)}")
rows = conn.execute(" ".join(parts), params).fetchall()
return [dict(r) for r in rows]
# ── FTS Search ────────────────────────────────────────────────────────────────
def search(conn, query: str, limit: int = 10) -> list[dict]:
"""Full-text search across all indexed context content."""
rows = conn.execute(
"SELECT rowid, content, project_id, file_path, source_type, "
"rank FROM fts_context WHERE fts_context MATCH ? "
"ORDER BY rank LIMIT ?",
(query, limit),
).fetchall()
return [dict(r) for r in rows]
# ── Sync to project root ──────────────────────────────────────────────────────
def sync_to_project(conn, project_id: str) -> dict:
"""
Write CONTEXT.md (or fallback to compiled context) as AGENTS.md into
the project's sync_path, and create symlinks (CLAUDE.md, .cursorrules,
CODEX.md → AGENTS.md).
Returns {'ok': True, 'path': ...} or {'ok': False, 'error': ...}.
"""
proj = project_get(conn, project_id)
if proj is None:
return {"ok": False, "error": "project_not_found"}
sync_path = proj.get("sync_path")
if not sync_path:
return {"ok": False, "error": "no_sync_path_configured"}
# Try multi-file mode first: read CONTEXT.md from context_files
ctx_file = file_read(conn, project_id, "CONTEXT.md")
if ctx_file is not None:
content = ctx_file["content"]
else:
# Fallback to legacy single context
ctx = context_read(conn, project_id)
if ctx is None:
return {"ok": False, "error": "no_context"}
content = ctx["content"]
root = Path(sync_path)
if not root.exists():
return {"ok": False, "error": "sync_path_does_not_exist"}
agents_path = root / "AGENTS.md"
agents_path.write_text(content)
harnesses = {
"CLAUDE.md": True,
".cursorrules": True,
"CODEX.md": True,
".windsurfrules": True,
}
gh = root / ".github"
for name, _ in harnesses.items():
link = root / name
if link.exists() and link.is_symlink():
link.unlink()
elif link.exists() and not link.is_symlink():
# Existing file — skip, don't replace
continue
try:
link.symlink_to("AGENTS.md")
except OSError:
pass # symlinks not supported on this platform
# .github/copilot-instructions.md
if not gh.exists():
gh.mkdir(exist_ok=True)
copilot = gh / "copilot-instructions.md"
if copilot.exists() and copilot.is_symlink():
copilot.unlink()
if not copilot.exists():
try:
copilot.symlink_to("../AGENTS.md")
except OSError:
pass
return {"ok": True, "path": str(agents_path)}
# ── Context Files (multi-file per project) ─────────────────────────────────────
# Default files created when a project is created (or migrated)
DEFAULT_FILES = ["CONTEXT.md", "DECISIONS.md", "RUNBOOKS.md", "PROMPTS.md", "GLOSSARY.md"]
def normalize_file_path(file_path: str) -> str:
"""Normalize context file paths to uppercase .MD names."""
file_path = (file_path or "").strip().replace("\\", "/").lstrip("/")
if not file_path.lower().endswith(".md"):
file_path = file_path + ".md"
return file_path.upper()
def file_list(conn, project_id: str) -> list[dict]:
"""List all files for a project. Returns list of {file_id, file_path, version, updated_at, updated_by}."""
rows = conn.execute(
"SELECT file_id, file_path, version, updated_by, updated_at "
"FROM context_files WHERE project_id = ? ORDER BY file_path",
(project_id,)
).fetchall()
return [_row_to_dict(r) for r in rows]
def file_read(conn, project_id: str, file_path: str) -> dict | None:
"""Read a single context file. Returns with dynamic metadata header prepended."""
file_path = normalize_file_path(file_path)
row = conn.execute(
"SELECT cf.*, p.display_name FROM context_files cf "
"JOIN projects p ON p.project_id = cf.project_id "
"WHERE cf.project_id = ? AND cf.file_path = ?",
(project_id, file_path)
).fetchone()
if row is None:
return None
result = _row_to_dict(row)
content = result.get("content", "")
# Strip any existing headers before prepending fresh one
content = strip_metadata_header(content)
import re
body = content.lstrip("\n\r ")
body = re.sub(r"^(<!--.*?-->\s*\n*)*", "", body, flags=re.DOTALL)
body = re.sub(r"^---\s*\n.*?\n---\s*\n?", "", body, flags=re.DOTALL)
body = body.strip()
project_tags = project_get_tags(conn, project_id)
header = build_metadata_header(
project_id=project_id,
display_name=result.get("display_name"),
version=result.get("version", 1),
updated_at=result.get("updated_at"),
tags=project_tags or None,
)
result["content"] = header + body
return result
def file_create(conn, project_id: str, file_path: str, content: str = "",
updated_by: str = "admin") -> dict:
"""Create a new context file. Returns {'ok': True} or {'ok': False, 'error': ...}."""
# Check project exists
proj = project_get(conn, project_id)
if proj is None:
return {"ok": False, "error": "project_not_found"}
# Normalize file_path before lookup: uppercase, .MD extension
file_path = normalize_file_path(file_path)
# Check if file already exists
existing = conn.execute(
"SELECT file_id FROM context_files WHERE project_id = ? AND file_path = ?",
(project_id, file_path)
).fetchone()
if existing:
return {"ok": False, "error": "file_exists"}
# Strip any headers from initial content
clean = strip_metadata_header(content)
clean = clean.lstrip("\n\r ").strip()
conn.execute(
"INSERT INTO context_files (project_id, file_path, content, version, updated_by) "
"VALUES (?, ?, ?, 1, ?)",
(project_id, file_path, clean, updated_by)
)
audit_log(conn, updated_by, "create", f"Created file {file_path} in {project_id}",
project_id=project_id, entity_type="file", entity_id=file_path)
return {"ok": True, "file_path": file_path}
def file_update(conn, project_id: str, file_path: str, new_content: str,
updated_by: str, base_version: int) -> dict:
"""Update a context file with optimistic concurrency.
Returns {'ok': True, 'new_version': N} or {'ok': False, 'error': ...}."""
# Normalize
file_path = normalize_file_path(file_path)
row = conn.execute(
"SELECT version FROM context_files WHERE project_id = ? AND file_path = ?",
(project_id, file_path)
).fetchone()
if row is None:
return {"ok": False, "error": "not_found"}
current_version = row["version"]
if base_version != current_version:
return {"ok": False, "error": "conflict", "current_version": current_version}
# Strip headers before storing
clean = strip_metadata_header(new_content)
import re
clean = re.sub(r"^\s*(?:<!--.*?-->\s*)*", "", clean, flags=re.DOTALL)
clean = re.sub(r"^\s*---\n.*?\n---\n", "", clean, count=1, flags=re.DOTALL)
clean = clean.lstrip().strip()
new_version = current_version + 1
conn.execute(
"UPDATE context_files SET content = ?, version = ?, updated_by = ?, "
"updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') "
"WHERE project_id = ? AND file_path = ?",
(clean, new_version, updated_by, project_id, file_path)
)
audit_log(conn, updated_by, "update", f"Updated {file_path} in {project_id} to v{new_version}",
project_id=project_id, entity_type="file", entity_id=file_path)
return {"ok": True, "new_version": new_version}
def file_delete(conn, project_id: str, file_path: str) -> dict:
"""Delete a context file. Returns {'ok': True} or {'ok': False, 'error': ...}."""
file_path = normalize_file_path(file_path)
# Don't allow deleting CONTEXT.md (it's the canonical synced file)
if file_path == "CONTEXT.MD":
return {"ok": False, "error": "cannot_delete_context"}
row = conn.execute(
"SELECT file_id FROM context_files WHERE project_id = ? AND file_path = ?",
(project_id, file_path)
).fetchone()
if row is None:
return {"ok": False, "error": "not_found"}
conn.execute(
"DELETE FROM context_files WHERE project_id = ? AND file_path = ?",
(project_id, file_path)
)
audit_log(conn, "admin", "delete", f"Deleted {file_path} from {project_id}",
project_id=project_id, entity_type="file", entity_id=file_path)
return {"ok": True}
def compiled_read(conn, project_id: str) -> dict | None:
"""Read all files for a project, concatenated into a single compiled view.
Each file is preceded by a file-name header. The metadata header is prepended once at the top.
This is what get_project_context returns when multi-file is enabled."""
proj = project_get(conn, project_id)
if proj is None:
return None
# Get all files
files = conn.execute(
"SELECT file_path, content, version, updated_at, updated_by "
"FROM context_files WHERE project_id = ? ORDER BY file_path",
(project_id,)
).fetchall()
if not files:
# Fallback to legacy single-context mode
return context_read(conn, project_id)
# Build compiled view
sections = []
for f in files:
file_path = f["file_path"]
content = f["content"]
# Strip any existing headers
content = strip_metadata_header(content).strip()
# Add file-name separator header
sections.append(f"## {file_path}\n\n{content}")
compiled_body = "\n\n---\n\n".join(sections)
# Prepend dynamic metadata header
project_tags = project_get_tags(conn, project_id)
header = build_metadata_header(
project_id=project_id,
display_name=proj.get("display_name"),
version=proj.get("shared_version", 0),
updated_at=proj.get("updated_at"),
tags=project_tags or None,
)
# Get the latest version from project_context (for version checking)
ctx_row = conn.execute(
"SELECT version FROM project_context WHERE project_id = ?",
(project_id,)
).fetchone()
version = ctx_row["version"] if ctx_row else 0
return {
"project_id": project_id,
"version": version,
"content": header + compiled_body,
"files": [f["file_path"] for f in files],
}
def ensure_default_files(conn, project_id: str):
"""Create default context files for a project if they don't exist.
Migrates existing single-context content into CONTEXT.md."""
# Check if any files already exist
existing = conn.execute(
"SELECT COUNT(*) as cnt FROM context_files WHERE project_id = ?",
(project_id,)
).fetchone()
if existing and existing["cnt"] > 0:
return # Already has files
# Get existing single-context content to migrate into CONTEXT.md
ctx_row = conn.execute(
"SELECT content FROM project_context WHERE project_id = ?",
(project_id,)
).fetchone()
existing_content = ctx_row["content"] if ctx_row else ""
existing_content = strip_metadata_header(existing_content).strip()
# Create CONTEXT.md with existing content
conn.execute(
"INSERT INTO context_files (project_id, file_path, content, version, updated_by) "
"VALUES (?, 'CONTEXT.MD', ?, 1, 'admin')",
(project_id, existing_content)
)
# Create empty default files
for fname in DEFAULT_FILES:
if fname == "CONTEXT.md":
continue # Already created above
conn.execute(
"INSERT INTO context_files (project_id, file_path, content, version, updated_by) "
"VALUES (?, ?, '', 1, 'admin')",
(project_id, fname.upper())
)
+291
View File
@@ -0,0 +1,291 @@
"""
ctxd MCP server — stdio transport for Hermes integration.
Communicates over stdin/stdout using JSON-RPC 2.0.
Hermes spawns this process and discovers tools automatically.
"""
import json
import sys
import traceback
from ctxd import db as _db
from ctxd.config import CtxConfig
def main():
cfg = CtxConfig.from_home()
if not cfg.db_path.exists():
print("ctxd not initialized. Run 'ctx init' first.", file=sys.stderr)
sys.exit(1)
conn = _db.init_db(cfg)
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
req = json.loads(line)
except json.JSONDecodeError:
continue
req_id = req.get("id")
method = req.get("method", "")
params = req.get("params", {})
try:
if method == "initialize":
resp = {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"serverInfo": {"name": "ctxd", "version": "0.1.0"},
},
}
elif method == "notifications/initialized":
continue
elif method == "tools/list":
resp = {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"tools": [
{
"name": "get_project_context",
"description": "Read the current shared context (compiled markdown) for a project",
"inputSchema": {
"type": "object",
"properties": {
"project_id": {
"type": "string",
"description": "Project slug e.g. remote-rig",
}
},
"required": ["project_id"],
},
},
{
"name": "list_projects",
"description": "List all projects in the context store",
"inputSchema": {"type": "object", "properties": {}},
},
{
"name": "get_user_profile",
"description": "Get a user's profile (timezone, preferences, style)",
"inputSchema": {
"type": "object",
"properties": {
"user_id": {
"type": "string",
"description": "User ID e.g. admin",
}
},
"required": ["user_id"],
},
},
{
"name": "search_context",
"description": "Full-text search across all project context and user profiles",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search terms",
},
"limit": {
"type": "integer",
"description": "Max results (default 10)",
},
},
"required": ["query"],
},
},
{
"name": "sync_to_project",
"description": "Write current shared context as AGENTS.md + symlinks to project root",
"inputSchema": {
"type": "object",
"properties": {
"project_id": {
"type": "string",
"description": "Project slug",
}
},
"required": ["project_id"],
},
},
{
"name": "get_project_tags",
"description": "Get the metadata tags for a project",
"inputSchema": {
"type": "object",
"properties": {
"project_id": {
"type": "string",
"description": "Project slug",
}
},
"required": ["project_id"],
},
},
{
"name": "set_project_tags",
"description": "Set metadata tags for a project (replaces all tags)",
"inputSchema": {
"type": "object",
"properties": {
"project_id": {
"type": "string",
"description": "Project slug",
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "List of tag strings e.g. ARCHITECTURE, 3D-PRINTING",
},
},
"required": ["project_id", "tags"],
},
},
{
"name": "auto_generate_tags",
"description": "Use AI to analyze a project's context content and suggest appropriate metadata tags",
"inputSchema": {
"type": "object",
"properties": {
"project_id": {
"type": "string",
"description": "Project slug",
}
},
"required": ["project_id"],
},
},
]
},
}
elif method == "tools/call":
tool_name = params.get("name", "")
arguments = params.get("arguments", {})
resp = {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"content": [
{
"type": "text",
"text": _handle_tool_call(conn, tool_name, arguments),
}
]
},
}
else:
# Unknown method — return empty error
resp = {
"jsonrpc": "2.0",
"id": req_id,
"error": {"code": -32601, "message": f"Method not found: {method}"},
}
except Exception as e:
print(f"ctxd error: {e}", file=sys.stderr)
traceback.print_exc(file=sys.stderr)
resp = {
"jsonrpc": "2.0",
"id": req_id,
"error": {"code": -32603, "message": str(e)},
}
print(json.dumps(resp), flush=True)
def _handle_tool_call(conn, tool_name, arguments) -> str:
import json as _json
if tool_name == "list_projects":
projects = _db.project_list(conn)
return _json.dumps(projects, indent=2)
if tool_name == "get_project_context":
pid = arguments["project_id"]
ctx = _db.context_read(conn, pid)
if ctx is None:
return _json.dumps({"error": f"Project '{pid}' not found"})
_db.audit_log(conn, "hermes-gateway", "read",
f"MCP read context for {pid}",
agent_id="hermes", project_id=pid,
entity_type="project", entity_id=pid)
conn.commit()
return _json.dumps({
"project_id": pid,
"version": ctx["version"],
"updated_by": ctx.get("updated_by"),
"updated_at": ctx.get("updated_at"),
"content": ctx["content"],
}, indent=2)
if tool_name == "get_user_profile":
uid = arguments["user_id"]
profile = _db.profile_read(conn, uid)
if profile is None:
return _json.dumps({"error": f"User '{uid}' not found"})
return _json.dumps({"user_id": uid, "content": profile["content"], "version": profile["version"]}, indent=2)
if tool_name == "search_context":
query = arguments["query"]
limit = arguments.get("limit", 10)
results = _db.search(conn, query, limit=limit)
_db.audit_log(conn, "hermes-gateway", "search",
f"MCP search: {query[:80]}",
agent_id="hermes")
conn.commit()
return _json.dumps(results, indent=2)
if tool_name == "sync_to_project":
pid = arguments["project_id"]
result = _db.sync_to_project(conn, pid)
conn.commit()
return _json.dumps(result, indent=2)
if tool_name == "get_project_tags":
pid = arguments["project_id"]
tags = _db.project_get_tags(conn, pid)
return _json.dumps({"project_id": pid, "tags": tags}, indent=2)
if tool_name == "set_project_tags":
pid = arguments["project_id"]
tags = arguments["tags"]
_db.project_set_tags(conn, pid, tags)
_db.audit_log(conn, "hermes-gateway", "update",
f"Set tags for {pid}: {tags}",
agent_id="hermes", project_id=pid,
entity_type="project", entity_id=pid)
conn.commit()
return _json.dumps({"ok": True, "project_id": pid, "tags": tags}, indent=2)
if tool_name == "auto_generate_tags":
pid = arguments["project_id"]
ctx = _db.context_read(conn, pid)
if ctx is None:
return _json.dumps({"error": f"Project '{pid}' not found"})
# Return the context content and ask the LLM to suggest tags
tags = _db.project_get_tags(conn, pid)
return _json.dumps({
"project_id": pid,
"project_name": pid,
"current_tags": tags,
"content": ctx.get("content", ""),
"action_required": "Analyze the content above and suggest appropriate metadata tags. Then call set_project_tags with your suggestions.",
}, indent=2)
return _json.dumps({"error": f"Unknown tool: {tool_name}"})
if __name__ == "__main__":
main()
+292
View File
@@ -0,0 +1,292 @@
-- ============================================================================
-- ctxd — Context Daemon Schema
-- SQLite 3.x, WAL mode, FTS5
-- ============================================================================
-- WAL for concurrent reads during writes; foreign keys enforced
PRAGMA journal_mode = WAL;
PRAGMA foreign_keys = ON;
-- ============================================================================
-- USERS
-- ============================================================================
CREATE TABLE users (
user_id TEXT PRIMARY KEY, -- uuid or "joshua", "polly", "hermes-gateway"
display_name TEXT NOT NULL,
role TEXT NOT NULL DEFAULT 'contributor'
CHECK (role IN ('admin', 'contributor', 'service')),
token_hash TEXT, -- NULL = no auth (localhost/trusted)
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
);
CREATE UNIQUE INDEX idx_users_lower ON users (LOWER(user_id));
-- ============================================================================
-- PROJECTS
-- ============================================================================
CREATE TABLE projects (
project_id TEXT PRIMARY KEY, -- uuid or slug "remote-rig"
display_name TEXT NOT NULL,
description TEXT,
metadata_tags TEXT DEFAULT '[]', -- JSON array of tag strings e.g. '["ARCHITECTURE","3D-PRINTING"]'
shared_version INTEGER NOT NULL DEFAULT 0, -- monotonically increasing
auto_sync INTEGER NOT NULL DEFAULT 0, -- boolean: auto-write AGENTS.md to sync_path
sync_path TEXT, -- absolute path to project root (nullable)
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
);
-- ============================================================================
-- PROJECT PERMISSIONS (admin overrides all)
-- ============================================================================
CREATE TABLE project_permissions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id TEXT NOT NULL REFERENCES projects(project_id) ON DELETE CASCADE,
user_id TEXT NOT NULL REFERENCES users(user_id) ON DELETE CASCADE,
permission TEXT NOT NULL DEFAULT 'editor'
CHECK (permission IN ('owner', 'editor', 'viewer')),
UNIQUE(project_id, user_id)
);
-- ============================================================================
-- USER PROFILES (personal context — timezone, preferences, style)
-- ============================================================================
CREATE TABLE user_profiles (
user_id TEXT PRIMARY KEY REFERENCES users(user_id) ON DELETE CASCADE,
content TEXT NOT NULL DEFAULT '', -- markdown
version INTEGER NOT NULL DEFAULT 1,
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
);
-- ============================================================================
-- PROJECT CONTEXT — THE AUTHORITATIVE SHARED COPY
-- This is the compiled markdown: context.md + decisions/ + runbooks/
-- that gets served to agents.
-- ============================================================================
CREATE TABLE project_context (
project_id TEXT PRIMARY KEY REFERENCES projects(project_id) ON DELETE CASCADE,
content TEXT NOT NULL DEFAULT '', -- compiled markdown
version INTEGER NOT NULL DEFAULT 0, -- mirrors projects.shared_version
updated_by TEXT REFERENCES users(user_id),
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
);
-- ============================================================================
-- CONTEXT FILES — individual files within a project (decisions/, runbooks/)
-- The daemon compiles these into project_context.content on demand.
-- version tracks this file's edit count (independent of the shared version).
-- ============================================================================
CREATE TABLE context_files (
file_id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id TEXT NOT NULL REFERENCES projects(project_id) ON DELETE CASCADE,
file_path TEXT NOT NULL, -- "decisions/001-use-go.md"
content TEXT NOT NULL DEFAULT '',
version INTEGER NOT NULL DEFAULT 1, -- per-file edit counter
updated_by TEXT REFERENCES users(user_id),
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
UNIQUE(project_id, file_path)
);
-- ============================================================================
-- USER WORKSPACES — per-user forks of a project
-- When a user edits, they work on their own copy. base_version tracks which
-- shared version they started from. current_version tracks their edits.
-- ============================================================================
CREATE TABLE user_workspaces (
workspace_id TEXT PRIMARY KEY, -- uuid
user_id TEXT NOT NULL REFERENCES users(user_id) ON DELETE CASCADE,
project_id TEXT NOT NULL REFERENCES projects(project_id) ON DELETE CASCADE,
status TEXT NOT NULL DEFAULT 'in_progress'
CHECK (status IN ('in_progress', 'submitted', 'merged', 'abandoned')),
base_version INTEGER NOT NULL, -- shared version at fork time
current_version INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
UNIQUE(user_id, project_id, status) -- one active workspace per user per project
);
-- ============================================================================
-- WORKSPACE FILES — per-user fork of context_files
-- Mirrors the same file_path as context_files but in the user's workspace.
-- ============================================================================
CREATE TABLE workspace_files (
file_id INTEGER PRIMARY KEY AUTOINCREMENT,
workspace_id TEXT NOT NULL REFERENCES user_workspaces(workspace_id) ON DELETE CASCADE,
file_path TEXT NOT NULL,
content TEXT NOT NULL DEFAULT '',
version INTEGER NOT NULL DEFAULT 1,
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
UNIQUE(workspace_id, file_path)
);
-- ============================================================================
-- CHANGE REQUESTS — submit / review / merge workflow
-- ============================================================================
CREATE TABLE change_requests (
request_id TEXT PRIMARY KEY, -- uuid
workspace_id TEXT NOT NULL REFERENCES user_workspaces(workspace_id) ON DELETE CASCADE,
project_id TEXT NOT NULL REFERENCES projects(project_id),
submitted_by TEXT NOT NULL REFERENCES users(user_id),
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending', 'approved', 'rejected', 'merged')),
-- Snapshot of what changed, stored inline so reviews survive workspace mutation
diff_summary TEXT, -- free-text summary of changes
target_version INTEGER NOT NULL, -- the shared version this would bump to
base_version INTEGER NOT NULL, -- the shared version they forked from
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
updated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
);
-- ============================================================================
-- REVIEWS — approvals/rejections on change requests
-- ============================================================================
CREATE TABLE reviews (
review_id INTEGER PRIMARY KEY AUTOINCREMENT,
request_id TEXT NOT NULL REFERENCES change_requests(request_id) ON DELETE CASCADE,
reviewer_id TEXT NOT NULL REFERENCES users(user_id),
decision TEXT NOT NULL CHECK (decision IN ('approved', 'rejected')),
comments TEXT,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')),
UNIQUE(request_id, reviewer_id)
);
-- ============================================================================
-- SNAPSHOTS — point-in-time copies of project or workspace content
-- Stored as files on disk at the path in storage_path.
-- ============================================================================
CREATE TABLE snapshots (
snapshot_id TEXT PRIMARY KEY, -- uuid
project_id TEXT NOT NULL REFERENCES projects(project_id) ON DELETE CASCADE,
-- NULL user_id = snapshot of the shared copy; non-NULL = snapshot of a user workspace
user_id TEXT REFERENCES users(user_id) ON DELETE CASCADE,
workspace_id TEXT REFERENCES user_workspaces(workspace_id) ON DELETE SET NULL,
version_from INTEGER, -- version range this snapshot covers
version_to INTEGER,
storage_path TEXT NOT NULL, -- relative to ~/.ctx/snapshots/
content_hash TEXT NOT NULL, -- sha256 of the compiled markdown
size_bytes INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
);
-- Index for snapshot rotation queries
CREATE INDEX idx_snapshots_cleanup ON snapshots (project_id, user_id, created_at);
-- ============================================================================
-- AUDIT LOG — append-only (INSERT only, never UPDATE or DELETE)
-- ============================================================================
CREATE TABLE audit_log (
entry_id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL REFERENCES users(user_id),
agent_id TEXT NOT NULL DEFAULT 'cli', -- "hermes", "claude-code", "ctx"
session_id TEXT, -- opaque session identifier
project_id TEXT REFERENCES projects(project_id),
operation TEXT NOT NULL
CHECK (operation IN (
'read', 'update', 'create', 'delete',
'submit', 'approve', 'reject', 'merge',
'sync', 'search', 'export', 'restore',
'login', 'logout', 'import'
)),
entity_type TEXT, -- 'project', 'workspace', 'change_request', 'snapshot', 'user_profile'
entity_id TEXT, -- polymorphic reference
summary TEXT NOT NULL, -- human-readable: "Updated camera-node wiring section"
details_json TEXT, -- structured payload: diff, version numbers, etc.
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ', 'now'))
);
-- Audit queries by user, project, or time range
CREATE INDEX idx_audit_user ON audit_log (user_id, created_at);
CREATE INDEX idx_audit_project ON audit_log (project_id, created_at);
CREATE INDEX idx_audit_agent ON audit_log (agent_id, created_at);
CREATE INDEX idx_audit_op ON audit_log (operation, created_at);
-- Trigger: audit_log is append-only — enforce no updates or deletes at the DB level
CREATE TRIGGER tr_audit_log_no_update
BEFORE UPDATE ON audit_log
BEGIN
SELECT RAISE(ABORT, 'audit_log is append-only — no updates allowed');
END;
CREATE TRIGGER tr_audit_log_no_delete
BEFORE DELETE ON audit_log
BEGIN
SELECT RAISE(ABORT, 'audit_log is append-only — no deletes allowed');
END;
-- ============================================================================
-- FULL-TEXT SEARCH (FTS5)
-- ============================================================================
CREATE VIRTUAL TABLE fts_context USING fts5(
content,
project_id UNINDEXED,
file_path UNINDEXED,
source_type UNINDEXED, -- 'project_context', 'context_file', 'user_profile', 'workspace_file'
tokenize='porter unicode61'
);
-- Triggers to keep FTS index in sync with project_context
CREATE TRIGGER tr_fts_project_context_insert AFTER INSERT ON project_context
BEGIN
INSERT INTO fts_context(rowid, content, project_id, file_path, source_type)
VALUES (NEW.rowid, NEW.content, NEW.project_id, 'context.md', 'project_context');
END;
CREATE TRIGGER tr_fts_project_context_update AFTER UPDATE ON project_context
BEGIN
DELETE FROM fts_context WHERE rowid = OLD.rowid;
INSERT INTO fts_context(rowid, content, project_id, file_path, source_type)
VALUES (NEW.rowid, NEW.content, NEW.project_id, 'context.md', 'project_context');
END;
CREATE TRIGGER tr_fts_project_context_delete AFTER DELETE ON project_context
BEGIN
DELETE FROM fts_context WHERE rowid = OLD.rowid;
END;
-- Triggers for context_files
CREATE TRIGGER tr_fts_context_files_insert AFTER INSERT ON context_files
BEGIN
INSERT INTO fts_context(rowid, content, project_id, file_path, source_type)
VALUES (NEW.file_id + 1000000, NEW.content, NEW.project_id, NEW.file_path, 'context_file');
END;
CREATE TRIGGER tr_fts_context_files_update AFTER UPDATE ON context_files
BEGIN
DELETE FROM fts_context WHERE rowid = OLD.file_id + 1000000;
INSERT INTO fts_context(rowid, content, project_id, file_path, source_type)
VALUES (NEW.file_id + 1000000, NEW.content, NEW.project_id, NEW.file_path, 'context_file');
END;
CREATE TRIGGER tr_fts_context_files_delete AFTER DELETE ON context_files
BEGIN
DELETE FROM fts_context WHERE rowid = OLD.file_id + 1000000;
END;
-- Triggers for user_profiles
CREATE TRIGGER tr_fts_user_profiles_insert AFTER INSERT ON user_profiles
BEGIN
INSERT INTO fts_context(rowid, content, project_id, file_path, source_type)
VALUES (NEW.rowid + 2000000, NEW.content, '~user~', NEW.user_id, 'user_profile');
END;
CREATE TRIGGER tr_fts_user_profiles_update AFTER UPDATE ON user_profiles
BEGIN
DELETE FROM fts_context WHERE rowid = OLD.rowid + 2000000;
INSERT INTO fts_context(rowid, content, project_id, file_path, source_type)
VALUES (NEW.rowid + 2000000, NEW.content, '~user~', NEW.user_id, 'user_profile');
END;
-- ============================================================================
-- SEED DATA (for development / first-run)
-- ============================================================================
INSERT INTO users (user_id, display_name, role) VALUES
('admin', 'Administrator', 'admin'),
('hermes-gateway', 'Hermes Agent', 'service');
INSERT INTO projects (project_id, display_name, description) VALUES
('welcome', 'Welcome', 'Getting started guide and documentation for ctxd'),
('remote-rig', 'RemoteRig', 'Multi-camera remote monitoring system');
-- Project context is seeded by the Python init code (cmd_init)
-- to ensure real newlines, not literal backslash-n from SQL strings.
+743
View File
@@ -0,0 +1,743 @@
"""
ctxd server — dual-protocol daemon serving context over MCP + HTTP.
"""
import asyncio
import json
import logging
import os
import sys
from pathlib import Path
logger = logging.getLogger("ctxd")
try:
import mcp.types as types
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
except ImportError:
types = None
Server = None
from . import db as _db
from .config import CtxConfig
# ── MCP Server ─────────────────────────────────────────────────────────────────
def make_mcp_server(cfg: CtxConfig):
"""Create an MCP server instance wired to our database."""
app = Server("context-dossier")
def _conn():
"""Short-lived connection per request (WAL allows concurrent reads)."""
return _db.init_db(cfg)
@app.list_tools()
async def list_tools():
return [
types.Tool(
name="get_project_context",
description="Read the current shared context for a project (compiled markdown)",
inputSchema={
"type": "object",
"properties": {
"project_id": {
"type": "string",
"description": "Project slug e.g. remote-rig",
}
},
"required": ["project_id"],
},
),
types.Tool(
name="list_projects",
description="List all projects available in the context store",
inputSchema={
"type": "object",
"properties": {},
},
),
types.Tool(
name="get_user_profile",
description="Get the user profile (personal preferences, timezone, style)",
inputSchema={
"type": "object",
"properties": {
"user_id": {
"type": "string",
"description": "User ID e.g. admin",
}
},
"required": ["user_id"],
},
),
types.Tool(
name="search_context",
description="Full-text search across all project context and user profiles",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search terms",
},
"limit": {
"type": "integer",
"description": "Max results (default 10)",
},
},
"required": ["query"],
},
),
types.Tool(
name="sync_to_project",
description="Write current shared context as AGENTS.md + symlinks to the project root",
inputSchema={
"type": "object",
"properties": {
"project_id": {
"type": "string",
"description": "Project slug",
}
},
"required": ["project_id"],
},
),
types.Tool(
name="get_project_tags",
description="Get metadata tags for a project",
inputSchema={
"type": "object",
"properties": {
"project_id": {
"type": "string",
"description": "Project slug",
}
},
"required": ["project_id"],
},
),
types.Tool(
name="set_project_tags",
description="Set metadata tags for a project (replaces all tags)",
inputSchema={
"type": "object",
"properties": {
"project_id": {
"type": "string",
"description": "Project slug",
},
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Uppercase metadata tags",
},
},
"required": ["project_id", "tags"],
},
),
types.Tool(
name="auto_generate_tags",
description="Return project content for AI analysis and tag suggestion, then call set_project_tags with suggestions",
inputSchema={
"type": "object",
"properties": {
"project_id": {
"type": "string",
"description": "Project slug",
}
},
"required": ["project_id"],
},
),
types.Tool(
name="list_files",
description="List all context files for a project (multi-file mode)",
inputSchema={
"type": "object",
"properties": {
"project_id": {"type": "string", "description": "Project slug"},
},
"required": ["project_id"],
},
),
types.Tool(
name="get_file",
description="Read a single context file from a project. Returns content with metadata header.",
inputSchema={
"type": "object",
"properties": {
"project_id": {"type": "string", "description": "Project slug"},
"file_path": {"type": "string", "description": "File name e.g. CONTEXT.md, DECISIONS.md"},
},
"required": ["project_id", "file_path"],
},
),
types.Tool(
name="update_file",
description="Update a single context file in a project with optimistic version checking",
inputSchema={
"type": "object",
"properties": {
"project_id": {"type": "string", "description": "Project slug"},
"file_path": {"type": "string", "description": "File name e.g. CONTEXT.md"},
"content": {"type": "string", "description": "New file content (markdown)"},
"base_version": {"type": "integer", "description": "Current version of the file (for conflict detection)"},
},
"required": ["project_id", "file_path", "content", "base_version"],
},
),
]
@app.call_tool()
async def call_tool(name: str, arguments: dict):
conn = _conn()
try:
if name == "list_projects":
projects = _db.project_list(conn)
return [types.TextContent(
type="text",
text=json.dumps(projects, indent=2),
)]
elif name == "get_project_context":
pid = arguments["project_id"]
# Use compiled view (all files concatenated) if multi-file exists
ctx = _db.compiled_read(conn, pid)
if ctx is None:
return [types.TextContent(type="text", text=f"Project '{pid}' not found")]
result = {
"project_id": pid,
"version": ctx["version"],
"updated_by": ctx.get("updated_by"),
"updated_at": ctx.get("updated_at"),
"content": ctx["content"],
}
_db.audit_log(conn, "hermes-gateway", "read",
f"MCP read context for {pid}",
agent_id="hermes", project_id=pid,
entity_type="project", entity_id=pid)
conn.commit()
return [types.TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "get_user_profile":
uid = arguments["user_id"]
profile = _db.profile_read(conn, uid)
if profile is None:
return [types.TextContent(type="text", text=f"User '{uid}' not found")]
return [types.TextContent(type="text", text=json.dumps(dict(profile), indent=2))]
elif name == "search_context":
query = arguments["query"]
limit = arguments.get("limit", 10)
results = _db.search(conn, query, limit=limit)
_db.audit_log(conn, "hermes-gateway", "search",
f"MCP search: {query[:80]}",
agent_id="hermes")
conn.commit()
return [types.TextContent(
type="text",
text=json.dumps(results, indent=2),
)]
elif name == "sync_to_project":
pid = arguments["project_id"]
result = _db.sync_to_project(conn, pid)
conn.commit()
return [types.TextContent(
type="text",
text=json.dumps(result, indent=2),
)]
elif name == "get_project_tags":
pid = arguments["project_id"]
tags = _db.project_get_tags(conn, pid)
return [types.TextContent(
type="text",
text=json.dumps({"project_id": pid, "tags": tags}, indent=2),
)]
elif name == "set_project_tags":
pid = arguments["project_id"]
tags = [str(t).upper().replace(" ", "-") for t in arguments.get("tags", [])]
_db.project_set_tags(conn, pid, tags)
_db.audit_log(conn, "hermes-gateway", "set_tags",
f"Set tags for {pid}: {', '.join(tags)}",
agent_id="hermes", project_id=pid,
entity_type="project", entity_id=pid)
conn.commit()
return [types.TextContent(
type="text",
text=json.dumps({"ok": True, "project_id": pid, "tags": tags}, indent=2),
)]
elif name == "auto_generate_tags":
pid = arguments["project_id"]
ctx = _db.compiled_read(conn, pid)
if ctx is None:
return [types.TextContent(type="text", text=f"Project '{pid}' not found")]
tags = _db.project_get_tags(conn, pid)
result = {
"project_id": pid,
"current_tags": tags,
"content": ctx.get("content", ""),
"action_required": "Analyze the content above and suggest concise uppercase metadata tags. Then call set_project_tags with your suggestions.",
}
return [types.TextContent(
type="text",
text=json.dumps(result, indent=2),
)]
elif name == "list_files":
pid = arguments["project_id"]
files = _db.file_list(conn, pid)
return [types.TextContent(
type="text",
text=json.dumps({"project_id": pid, "files": files}, indent=2),
)]
elif name == "get_file":
pid = arguments["project_id"]
file_path = arguments["file_path"]
result = _db.file_read(conn, pid, file_path)
if result is None:
return [types.TextContent(type="text", text=f"File '{file_path}' not found in project '{pid}'")]
_db.audit_log(conn, "hermes-gateway", "read",
f"MCP read file {file_path} for {pid}",
agent_id="hermes", project_id=pid,
entity_type="file", entity_id=file_path)
conn.commit()
return [types.TextContent(
type="text",
text=json.dumps(result, indent=2),
)]
elif name == "update_file":
pid = arguments["project_id"]
file_path = arguments["file_path"]
content = arguments["content"]
base_version = arguments["base_version"]
result = _db.file_update(conn, pid, file_path, content, "hermes-gateway", base_version)
if result.get("ok"):
conn.commit()
return [types.TextContent(
type="text",
text=json.dumps(result, indent=2),
)]
return [types.TextContent(type="text", text=f"Unknown tool: {name}")]
finally:
conn.close()
return app
# ── HTTP Server (stdlib-only, no dependencies) ────────────────────────────────
class HTTPServer:
"""Minimal HTTP server for REST access (ctx push/pull, status)."""
def __init__(self, cfg: CtxConfig):
self.cfg = cfg
self._conn = _db.init_db(cfg)
def handle(self, method: str, path: str, body: bytes | None) -> tuple[int, dict, str]:
"""Returns (status_code, headers, body_string)."""
try:
return self._route(method, path, body)
except Exception as e:
logger.exception("HTTP error")
return (500, {"Content-Type": "text/plain"}, f"Internal error: {e}")
def _route(self, method: str, path: str, body: bytes | None) -> tuple:
# GET / — serve the web UI
if method == "GET" and path == "/":
ui_path = Path(__file__).parent / "ui.html"
if ui_path.exists():
return (200, {
"Content-Type": "text/html; charset=utf-8",
"Cache-Control": "no-store, max-age=0",
}, ui_path.read_text())
return (200, {"Content-Type": "text/plain"}, "Context Dossier")
# GET /status — daemon health
if method == "GET" and path == "/status":
data = {"status": "ok", "db": str(self.cfg.db_path)}
return (200, {"Content-Type": "application/json"}, json.dumps(data))
# GET /projects — list projects
if method == "GET" and path == "/projects":
projects = _db.project_list(self._conn)
return (200, {"Content-Type": "application/json"}, json.dumps(projects))
# POST /projects — create a new project
if method == "POST" and path == "/projects":
payload = json.loads(body or b"{}")
pid = payload.get("project_id", "")
name = payload.get("display_name", pid)
desc = payload.get("description", "")
if not pid:
return (400, {"Content-Type": "application/json"}, json.dumps({"error": "project_id required"}))
try:
_db.project_create(self._conn, pid, name, desc)
_db.audit_log(self._conn, "admin", "create", f"Created project {pid}",
project_id=pid, entity_type="project", entity_id=pid)
self._conn.commit()
return (200, {"Content-Type": "application/json"}, json.dumps({"ok": True, "project_id": pid}))
except Exception as e:
self._conn.rollback()
return (400, {"Content-Type": "application/json"}, json.dumps({"error": str(e)}))
# GET /projects/<id>/context — read project context (compiled view)
if method == "GET" and path.startswith("/projects/") and path.endswith("/context"):
pid = path.split("/")[2]
ctx = _db.compiled_read(self._conn, pid)
if ctx is None:
return (404, {"Content-Type": "text/plain"}, f"Project '{pid}' not found")
return (200, {"Content-Type": "application/json"}, json.dumps({
"project_id": pid,
"version": ctx["version"],
"content": ctx["content"],
"files": ctx.get("files", []),
}))
# POST /projects/<id>/context — update context (with version check)
if method == "POST" and path.startswith("/projects/") and path.endswith("/context"):
pid = path.split("/")[2]
payload = json.loads(body or b"{}")
content = payload.get("content", "")
updated_by = payload.get("updated_by", "admin")
base_version = payload.get("base_version", 0)
result = _db.context_update(
self._conn, pid, content, updated_by, base_version
)
self._conn.commit()
if result.get("ok"):
return (200, {"Content-Type": "application/json"}, json.dumps(result))
elif result.get("error") == "conflict":
return (409, {"Content-Type": "application/json"}, json.dumps(result))
else:
return (404, {"Content-Type": "application/json"}, json.dumps(result))
# GET /projects/<id>/snapshots — list snapshots
if method == "GET" and path.startswith("/projects/") and path.endswith("/snapshots"):
pid = path.split("/")[2]
snaps = _db.snapshot_list(self._conn, pid)
return (200, {"Content-Type": "application/json"}, json.dumps(snaps))
# GET /projects/<id>/tags — get project tags
if method == "GET" and path.startswith("/projects/") and path.endswith("/tags"):
pid = path.split("/")[2]
tags = _db.project_get_tags(self._conn, pid)
return (200, {"Content-Type": "application/json"}, json.dumps({"project_id": pid, "tags": tags}))
# POST /projects/<id>/tags — set project tags
if method == "POST" and path.startswith("/projects/") and path.endswith("/tags"):
pid = path.split("/")[2]
payload = json.loads(body or b"{}")
tags = payload.get("tags", [])
_db.project_set_tags(self._conn, pid, tags)
_db.audit_log(self._conn, "admin", "update", f"Set tags for {pid}: {tags}",
project_id=pid, entity_type="project", entity_id=pid)
self._conn.commit()
return (200, {"Content-Type": "application/json"}, json.dumps({"ok": True, "tags": tags}))
# POST /projects/<id>/sync — sync context to project root
if method == "POST" and path.startswith("/projects/") and path.endswith("/sync"):
pid = path.split("/")[2]
result = _db.sync_to_project(self._conn, pid)
self._conn.commit()
if result.get("ok"):
return (200, {"Content-Type": "application/json"}, json.dumps(result))
else:
return (400, {"Content-Type": "application/json"}, json.dumps(result))
# POST /projects/<id>/import — import raw text content as project context
# Strips existing headers/frontmatter, stores clean body.
if method == "POST" and path.startswith("/projects/") and path.endswith("/import"):
pid = path.split("/")[2]
payload = json.loads(body or b"{}")
raw_content = payload.get("content", "")
updated_by = payload.get("updated_by", "admin")
base_version = payload.get("base_version", 0)
if not raw_content:
return (400, {"Content-Type": "application/json"}, json.dumps({"error": "content required"}))
# Strip any existing metadata headers / YAML frontmatter / HTML comments
clean = _db.strip_metadata_header(raw_content)
# Also strip leading HTML comments (vault imports)
import re
clean = re.sub(r"^\s*(?:<!--.*?-->\s*)*", "", clean, flags=re.DOTALL)
# Strip generic YAML frontmatter if present
clean = re.sub(r"^\s*---\n.*?\n---\n", "", clean, count=1, flags=re.DOTALL)
clean = clean.lstrip()
result = _db.context_update(self._conn, pid, clean, updated_by, base_version)
if result.get("ok"):
_db.audit_log(self._conn, updated_by, "import", f"Imported context for {pid}",
project_id=pid, entity_type="project", entity_id=pid)
self._conn.commit()
return (200, {"Content-Type": "application/json"}, json.dumps(result))
elif result.get("error") == "conflict":
return (409, {"Content-Type": "application/json"}, json.dumps(result))
else:
return (404, {"Content-Type": "application/json"}, json.dumps(result))
# ── Multi-file endpoints ────────────────────────────────────
# GET /projects/<id>/files — list all files
if method == "GET" and path.startswith("/projects/") and path.endswith("/files"):
pid = path.split("/")[2]
files = _db.file_list(self._conn, pid)
return (200, {"Content-Type": "application/json"}, json.dumps(files))
# GET /projects/<id>/files/<filename> — read a single file
if method == "GET" and path.startswith("/projects/") and "/files/" in path:
pid = path.split("/")[2]
file_path = path.split("/files/", 1)[1]
# URL-decode
import urllib.parse
file_path = urllib.parse.unquote(file_path)
result = _db.file_read(self._conn, pid, file_path)
if result is None:
return (404, {"Content-Type": "application/json"}, json.dumps({"error": "file not found"}))
return (200, {"Content-Type": "application/json"}, json.dumps(result))
# POST /projects/<id>/files — create a new file
if method == "POST" and path.startswith("/projects/") and path.endswith("/files"):
pid = path.split("/")[2]
payload = json.loads(body or b"{}")
file_path = payload.get("file_path", "")
content = payload.get("content", "")
updated_by = payload.get("updated_by", "admin")
if not file_path:
return (400, {"Content-Type": "application/json"}, json.dumps({"error": "file_path required"}))
result = _db.file_create(self._conn, pid, file_path, content, updated_by)
if result.get("ok"):
self._conn.commit()
return (200, {"Content-Type": "application/json"}, json.dumps(result))
else:
return (400, {"Content-Type": "application/json"}, json.dumps(result))
# PUT /projects/<id>/files/<filename> — update a file
if method == "PUT" and path.startswith("/projects/") and "/files/" in path:
pid = path.split("/")[2]
file_path = path.split("/files/", 1)[1]
import urllib.parse
file_path = urllib.parse.unquote(file_path)
payload = json.loads(body or b"{}")
content = payload.get("content", "")
updated_by = payload.get("updated_by", "admin")
base_version = payload.get("base_version", 0)
result = _db.file_update(self._conn, pid, file_path, content, updated_by, base_version)
if result.get("ok"):
self._conn.commit()
return (200, {"Content-Type": "application/json"}, json.dumps(result))
elif result.get("error") == "conflict":
return (409, {"Content-Type": "application/json"}, json.dumps(result))
else:
return (404, {"Content-Type": "application/json"}, json.dumps(result))
# DELETE /projects/<id>/files/<filename> — delete a file
if method == "DELETE" and path.startswith("/projects/") and "/files/" in path:
pid = path.split("/")[2]
file_path = path.split("/files/", 1)[1]
import urllib.parse
file_path = urllib.parse.unquote(file_path)
result = _db.file_delete(self._conn, pid, file_path)
if result.get("ok"):
self._conn.commit()
return (200, {"Content-Type": "application/json"}, json.dumps(result))
else:
return (400, {"Content-Type": "application/json"}, json.dumps(result))
# GET /projects/<id>/compiled — compiled view of all files
if method == "GET" and path.startswith("/projects/") and path.endswith("/compiled"):
pid = path.split("/")[2]
ctx = _db.compiled_read(self._conn, pid)
if ctx is None:
return (404, {"Content-Type": "text/plain"}, f"Project '{pid}' not found")
return (200, {"Content-Type": "application/json"}, json.dumps(ctx))
# POST /projects/<id>/migrate-files — create default files from existing context
if method == "POST" and path.startswith("/projects/") and path.endswith("/migrate-files"):
pid = path.split("/")[2]
_db.ensure_default_files(self._conn, pid)
self._conn.commit()
files = _db.file_list(self._conn, pid)
return (200, {"Content-Type": "application/json"}, json.dumps({"ok": True, "files": files}))
# GET /search?q=... — full-text search
if method == "GET" and path.startswith("/search"):
import urllib.parse
parsed = urllib.parse.urlparse(path)
qs = urllib.parse.parse_qs(parsed.query)
query = qs.get("q", [""])[0]
if not query:
return (400, {"Content-Type": "text/plain"}, "Missing ?q= parameter")
results = _db.search(self._conn, query)
return (200, {"Content-Type": "application/json"}, json.dumps(results))
# GET /audit — recent audit log
if method == "GET" and path.startswith("/audit"):
import urllib.parse
parsed = urllib.parse.urlparse(path)
qs = urllib.parse.parse_qs(parsed.query)
limit = int(qs.get("limit", [20])[0])
results = _db.audit_query(self._conn, limit=limit)
return (200, {"Content-Type": "application/json"}, json.dumps(results))
return (404, {"Content-Type": "text/plain"}, f"Not found: {method} {path}")
# ── ASGI app (wraps both MCP + HTTP) ──────────────────────────────────────────
class CombinedApp:
"""ASGI app that routes to MCP (SSE) or HTTP based on path."""
def __init__(self, cfg: CtxConfig):
self.cfg = cfg
self.http_handler = HTTPServer(cfg)
self.mcp_app = make_mcp_server(cfg)
self._sse_transport = None
self._mcp_init_opts = self.mcp_app.create_initialization_options()
async def __call__(self, scope, receive, send):
if scope["type"] == "http":
await self._handle_http(scope, receive, send)
elif scope["type"] == "lifespan":
await self._handle_lifespan(scope, receive, send)
async def _handle_lifespan(self, scope, receive, send):
while True:
msg = await receive()
if msg["type"] == "lifespan.startup":
await send({"type": "lifespan.startup.complete"})
elif msg["type"] == "lifespan.shutdown":
await send({"type": "lifespan.shutdown.complete"})
return
async def _handle_http(self, scope, receive, send):
"""Parse request, dispatch, send response."""
path = scope.get("path", "/")
method = scope.get("method", "GET")
raw_headers = {k.decode().lower(): v.decode() for k, v in scope.get("headers", [])}
# Auth check — exempt Web UI, health check, and MCP POST backchannel
# (MCP SSE auth is validated on the GET /sse connection, POST messages
# are part of the same authenticated session)
_AUTH_EXEMPT = (
(method == "GET" and path == "/"),
(method == "GET" and path == "/status"),
(method == "POST" and path in ("/mcp/messages", "/mcp/messages/")),
)
if self.cfg.auth_enabled and not any(_AUTH_EXEMPT):
token = raw_headers.get("authorization", "")
if token.startswith("Bearer "):
token = token[7:]
# Also accept ?key= query param (for MCP SSE clients)
if not token:
qs = scope.get("query_string", b"").decode()
for pair in qs.split("&"):
if pair.startswith("key="):
token = pair[4:]
break
if token != self.cfg.api_key:
await send({
"type": "http.response.start",
"status": 401,
"headers": [
(b"content-type", b"application/json"),
(b"access-control-allow-origin", b"*"),
],
})
await send({"type": "http.response.body", "body": b'{"error": "unauthorized"}'})
return
# Route MCP SSE before reading body — SSE needs the raw ASGI stream
if (method == "GET" and path == "/sse") or (method == "POST" and path in ("/mcp/messages", "/mcp/messages/")):
await self._serve_mcp_sse(scope, receive, send)
return
# Read body for all other requests
body = b""
more_body = True
while more_body:
msg = await receive()
if msg["type"] == "http.request":
body += msg.get("body", b"")
more_body = msg.get("more_body", False)
query_string = scope.get("query_string", b"")
if query_string:
path = path + "?" + query_string.decode()
# Regular HTTP
status, headers, body_str = self.http_handler.handle(method, path, body)
response_headers = [
(key.lower().encode(), value.encode())
for key, value in headers.items()
]
response_headers.append((b"access-control-allow-origin", b"*"))
await send({
"type": "http.response.start",
"status": status,
"headers": response_headers,
})
await send({
"type": "http.response.body",
"body": body_str.encode(),
})
async def _serve_mcp_sse(self, scope, receive, send):
"""MCP over SSE transport (streaming server-sent events)."""
from mcp.server.sse import SseServerTransport
if self._sse_transport is None:
self._sse_transport = SseServerTransport("/mcp/messages")
sse = self._sse_transport
method = scope.get("method", "GET")
path = scope.get("path", "/")
if method == "GET" and path == "/sse":
async with sse.connect_sse(scope, receive, send) as streams:
await self.mcp_app.run(streams[0], streams[1], self._mcp_init_opts)
return
if method == "POST" and path in ("/mcp/messages", "/mcp/messages/"):
await sse.handle_post_message(scope, receive, send)
return
await send({
"type": "http.response.start",
"status": 404,
"headers": [(b"content-type", b"text/plain")],
})
await send({"type": "http.response.body", "body": b"Not found"})
# ── Entry point ────────────────────────────────────────────────────────────────
async def serve(cfg: CtxConfig | None = None):
"""Start the ctxd daemon. Requires uvicorn + mcp."""
cfg = cfg or CtxConfig.from_home()
try:
import uvicorn
except ImportError:
print("ERROR: ctxd serve requires uvicorn. Install with:")
print(" pip install uvicorn")
print(" pip install ctxd[mcp]")
sys.exit(1)
app = CombinedApp(cfg)
logger.info(f"ctxd starting on {cfg.host}:{cfg.port}")
config = uvicorn.Config(app, host=cfg.host, port=cfg.port, log_level="info")
server = uvicorn.Server(config)
await server.serve()
def serve_sync(cfg: CtxConfig | None = None):
"""Synchronous entry point for ctx serve CLI command."""
asyncio.run(serve(cfg))
+1673
View File
File diff suppressed because it is too large Load Diff