Files
CTXD/app/scripts/test_unified_mcp.py
T

150 lines
4.7 KiB
Python

#!/usr/bin/env python3
"""Smoke test: one OAuth connector lists read+write tools on /mcp."""
import json, urllib.request, urllib.parse, hashlib, base64, secrets, http.client
from pathlib import Path
BASE = "http://127.0.0.1:9091"
env_path = Path("/mnt/ai-storage/projects/ctxd/app/.env")
approval_key = ""
for line in env_path.read_text().splitlines():
if line.startswith("OAUTH_APPROVAL_KEY="):
approval_key = line.split("=", 1)[1]
break
if not approval_key:
raise SystemExit("no OAUTH_APPROVAL_KEY")
class NR(urllib.request.HTTPRedirectHandler):
def redirect_request(self, req, fp, code, msg, headers, newurl):
return None
opener = urllib.request.build_opener(NR)
r = opener.open(
urllib.request.Request(
f"{BASE}/oauth/register",
data=json.dumps(
{
"redirect_uris": ["http://localhost:9999/cb"],
"client_name": "unified-mcp-test",
"scope": "ctxd.read ctxd.write",
}
).encode(),
headers={"Content-Type": "application/json"},
method="POST",
),
timeout=15,
)
client = json.loads(r.read())
cid, secret = client["client_id"], client["client_secret"]
cv = secrets.token_urlsafe(32)
cc = base64.urlsafe_b64encode(hashlib.sha256(cv.encode()).digest()).rstrip(b"=").decode()
params = {
"response_type": "code",
"client_id": cid,
"redirect_uri": "http://localhost:9999/cb",
"code_challenge": cc,
"code_challenge_method": "S256",
"scope": "ctxd.read ctxd.write",
"state": "t",
}
try:
opener.open(
urllib.request.Request(
f"{BASE}/oauth/authorize?" + urllib.parse.urlencode(params),
data=urllib.parse.urlencode({"approval_key": approval_key}).encode(),
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
),
timeout=15,
)
except urllib.error.HTTPError as e:
loc = e.headers.get("Location", "")
code = dict(urllib.parse.parse_qsl(urllib.parse.urlparse(loc).query)).get("code", "")
r3 = opener.open(
urllib.request.Request(
f"{BASE}/oauth/token",
data=urllib.parse.urlencode(
{
"grant_type": "authorization_code",
"code": code,
"redirect_uri": "http://localhost:9999/cb",
"client_id": cid,
"client_secret": secret,
"code_verifier": cv,
}
).encode(),
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
),
timeout=15,
)
token = json.loads(r3.read())["access_token"]
def mcp_post(path: str, body: dict, session_id: str | None = None) -> tuple[int, str, str | None]:
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
}
if session_id:
headers["mcp-session-id"] = session_id
conn = http.client.HTTPConnection("127.0.0.1", 9091, timeout=15)
conn.request("POST", path, body=json.dumps(body), headers=headers)
resp = conn.getresponse()
sid = resp.getheader("mcp-session-id")
data = resp.read(8000).decode(errors="replace")
conn.close()
return resp.status, data, sid
init_body = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2025-03-26",
"capabilities": {},
"clientInfo": {"name": "unified-test", "version": "1"},
},
}
st, raw, sid = mcp_post("/mcp", init_body)
print("initialize:", st, "session:", sid)
if st != 200:
raise SystemExit(raw[:500])
if sid:
st2, raw2, _ = mcp_post(
"/mcp",
{"jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {}},
session_id=sid.split(",")[0].strip(),
)
else:
st2, raw2, _ = mcp_post("/mcp", {"jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {}})
print("tools/list:", st2)
names = []
for line in raw2.splitlines():
if line.startswith("data:"):
try:
payload = json.loads(line[5:].strip())
for t in payload.get("result", {}).get("tools", []):
names.append(t.get("name"))
except json.JSONDecodeError:
pass
if '"tools"' in raw2 and not names:
try:
payload = json.loads(raw2)
names = [t["name"] for t in payload.get("result", {}).get("tools", [])]
except json.JSONDecodeError:
pass
print("tools:", sorted(set(names)))
need = {"list_projects", "update_file", "sync_to_project"}
missing = need - set(names)
if missing:
raise SystemExit(f"missing tools: {missing}")
print("OK unified connector exposes read+write tools")