#!/usr/bin/env python3 """E2E: OAuth write token -> /write/sse and tools/list via SSE.""" import json, urllib.request, urllib.parse, hashlib, base64, secrets, http.client, time from pathlib import Path BASE = "http://127.0.0.1:9091" env_path = Path("/mnt/ai-storage/projects/ctxd/app/.env") approval_key = "" for line in env_path.read_text().splitlines(): if line.startswith("OAUTH_APPROVAL_KEY="): approval_key = line.split("=", 1)[1] break if not approval_key: raise SystemExit("no OAUTH_APPROVAL_KEY") class NR(urllib.request.HTTPRedirectHandler): def redirect_request(self, *a): return None opener = urllib.request.build_opener(NR) r = opener.open(urllib.request.Request( f"{BASE}/oauth/register", data=json.dumps({"redirect_uris": ["http://localhost:9999/cb"], "client_name": "write-test", "scope": "ctxd.read ctxd.write"}).encode(), headers={"Content-Type": "application/json"}, method="POST"), timeout=15) client = json.loads(r.read()) cid, secret = client["client_id"], client["client_secret"] cv = secrets.token_urlsafe(32) cc = base64.urlsafe_b64encode(hashlib.sha256(cv.encode()).digest()).rstrip(b"=").decode() params = {"response_type": "code", "client_id": cid, "redirect_uri": "http://localhost:9999/cb", "code_challenge": cc, "code_challenge_method": "S256", "scope": "ctxd.read ctxd.write", "state": "t"} try: opener.open(urllib.request.Request(f"{BASE}/oauth/authorize?" + urllib.parse.urlencode(params), data=urllib.parse.urlencode({"approval_key": approval_key}).encode(), headers={"Content-Type": "application/x-www-form-urlencoded"}, method="POST"), timeout=15) except urllib.error.HTTPError as e: loc = e.headers.get("Location", "") code = dict(urllib.parse.parse_qsl(urllib.parse.urlparse(loc).query)).get("code", "") r3 = opener.open(urllib.request.Request(f"{BASE}/oauth/token", data=urllib.parse.urlencode({"grant_type": "authorization_code", "code": code, "redirect_uri": "http://localhost:9999/cb", "client_id": cid, "client_secret": secret, "code_verifier": cv}).encode(), headers={"Content-Type": "application/x-www-form-urlencoded"}, method="POST"), timeout=15) token = json.loads(r3.read())["access_token"] conn = http.client.HTTPConnection("127.0.0.1", 9091, timeout=5) conn.request("GET", "/write/sse", headers={"Authorization": f"Bearer {token}"}) resp = conn.getresponse() print("write/sse status:", resp.status, "ctype:", resp.getheader("content-type")) chunk = resp.read(500).decode(errors="replace") print("body[:200]:", chunk[:200]) conn.close() # POST initialize to /write/messages (needs session from SSE - simplified check) conn2 = http.client.HTTPConnection("127.0.0.1", 9091, timeout=10) body = json.dumps({"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}}) conn2.request("POST", "/write/messages", body=body, headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"}) r2 = conn2.getresponse() print("write/messages POST:", r2.status, r2.read(300).decode(errors="replace")[:200]) conn2.close()