#!/usr/bin/env python3 """Smoke test: one OAuth connector lists read+write tools on /mcp.""" import json, urllib.request, urllib.parse, hashlib, base64, secrets, http.client from pathlib import Path BASE = "http://127.0.0.1:9091" env_path = Path("/mnt/ai-storage/projects/ctxd/app/.env") approval_key = "" for line in env_path.read_text().splitlines(): if line.startswith("OAUTH_APPROVAL_KEY="): approval_key = line.split("=", 1)[1] break if not approval_key: raise SystemExit("no OAUTH_APPROVAL_KEY") class NR(urllib.request.HTTPRedirectHandler): def redirect_request(self, req, fp, code, msg, headers, newurl): return None opener = urllib.request.build_opener(NR) r = opener.open( urllib.request.Request( f"{BASE}/oauth/register", data=json.dumps( { "redirect_uris": ["http://localhost:9999/cb"], "client_name": "unified-mcp-test", "scope": "ctxd.read ctxd.write", } ).encode(), headers={"Content-Type": "application/json"}, method="POST", ), timeout=15, ) client = json.loads(r.read()) cid, secret = client["client_id"], client["client_secret"] cv = secrets.token_urlsafe(32) cc = base64.urlsafe_b64encode(hashlib.sha256(cv.encode()).digest()).rstrip(b"=").decode() params = { "response_type": "code", "client_id": cid, "redirect_uri": "http://localhost:9999/cb", "code_challenge": cc, "code_challenge_method": "S256", "scope": "ctxd.read ctxd.write", "state": "t", } try: opener.open( urllib.request.Request( f"{BASE}/oauth/authorize?" + urllib.parse.urlencode(params), data=urllib.parse.urlencode({"approval_key": approval_key}).encode(), headers={"Content-Type": "application/x-www-form-urlencoded"}, method="POST", ), timeout=15, ) except urllib.error.HTTPError as e: loc = e.headers.get("Location", "") code = dict(urllib.parse.parse_qsl(urllib.parse.urlparse(loc).query)).get("code", "") r3 = opener.open( urllib.request.Request( f"{BASE}/oauth/token", data=urllib.parse.urlencode( { "grant_type": "authorization_code", "code": code, "redirect_uri": "http://localhost:9999/cb", "client_id": cid, "client_secret": secret, "code_verifier": cv, } ).encode(), headers={"Content-Type": "application/x-www-form-urlencoded"}, method="POST", ), timeout=15, ) token = json.loads(r3.read())["access_token"] def mcp_post(path: str, body: dict, session_id: str | None = None) -> tuple[int, str, str | None]: headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", "Accept": "application/json, text/event-stream", } if session_id: headers["mcp-session-id"] = session_id conn = http.client.HTTPConnection("127.0.0.1", 9091, timeout=15) conn.request("POST", path, body=json.dumps(body), headers=headers) resp = conn.getresponse() sid = resp.getheader("mcp-session-id") data = resp.read(8000).decode(errors="replace") conn.close() return resp.status, data, sid init_body = { "jsonrpc": "2.0", "id": 1, "method": "initialize", "params": { "protocolVersion": "2025-03-26", "capabilities": {}, "clientInfo": {"name": "unified-test", "version": "1"}, }, } st, raw, sid = mcp_post("/mcp", init_body) print("initialize:", st, "session:", sid) if st != 200: raise SystemExit(raw[:500]) if sid: st2, raw2, _ = mcp_post( "/mcp", {"jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {}}, session_id=sid.split(",")[0].strip(), ) else: st2, raw2, _ = mcp_post("/mcp", {"jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {}}) print("tools/list:", st2) names = [] for line in raw2.splitlines(): if line.startswith("data:"): try: payload = json.loads(line[5:].strip()) for t in payload.get("result", {}).get("tools", []): names.append(t.get("name")) except json.JSONDecodeError: pass if '"tools"' in raw2 and not names: try: payload = json.loads(raw2) names = [t["name"] for t in payload.get("result", {}).get("tools", [])] except json.JSONDecodeError: pass print("tools:", sorted(set(names))) need = {"list_projects", "update_file", "sync_to_project"} missing = need - set(names) if missing: raise SystemExit(f"missing tools: {missing}") print("OK unified connector exposes read+write tools")