"""FastAPI surface for the passkey -> Nostr prototype. Single-process, in-memory. Restarting the server wipes registrations and pending challenges, which is the right default for a proof-of-concept: the only value worth persisting is the user's optional salt string. Endpoints: POST /register/start -> WebAuthn creation options (with PRF ext) POST /register/finish -> verify attestation, remember credential POST /auth/start -> WebAuthn request options (PRF eval) POST /auth/finish -> verify assertion, derive + return Nostr key Run locally: uvicorn nostr_passkey.app:app --reload --port 8000 """ from __future__ import annotations import logging import secrets from pathlib import Path from typing import Any from fastapi import FastAPI, HTTPException, Request from fastapi.staticfiles import StaticFiles from pydantic import BaseModel # Verbose logging so every ceremony step shows up in the uvicorn output. # ``force=True`` makes this win over uvicorn's default logging config. logging.basicConfig( level=logging.DEBUG, format="%(asctime)s %(levelname)-5s %(name)s · %(message)s", force=True, ) log = logging.getLogger("nostr_passkey") from .derivation import derive_nostr_key from .webauthn_flow import ( PendingChallenge, StoredCredential, extract_prf_output, new_authentication_options, new_registration_options, verify_authentication, verify_registration, ) app = FastAPI(title="Nostr Passkey Prototype") @app.middleware("http") async def _trace(request: Request, call_next): """Log every request with method, path, and response status.""" log.info("→ %s %s", request.method, request.url.path) response = await call_next(request) log.info("← %s %s %s", request.method, request.url.path, response.status_code) return response # In-memory stores. NOT for production. _pending: dict[str, PendingChallenge] = {} _credentials: dict[str, StoredCredential] = {} # username -> credential _salts: dict[str, str] = {} # username -> last-used salt (optional) # Fixed PRF eval input. Any stable bytes work — the authenticator hashes # this with the "WebAuthn PRF\x00" context before HMAC, and keeping it # constant across assertions is what makes the PRF output stable for a # given credential. Changing this constant rotates every derived key. PRF_EVAL_FIRST = b"nostr-passkey/v1/prf-eval" class RegisterStartRequest(BaseModel): username: str class RegisterFinishRequest(BaseModel): username: str session_id: str credential: dict[str, Any] class AuthStartRequest(BaseModel): username: str class AuthFinishRequest(BaseModel): username: str session_id: str credential: dict[str, Any] salt: str | None = None class NostrIdentityResponse(BaseModel): npub: str nsec: str privkey_hex: str pubkey_hex: str @app.post("/register/start") def register_start(req: RegisterStartRequest) -> dict[str, Any]: log.debug("register/start username=%r", req.username) options, pending = new_registration_options( username=req.username, prf_eval_first=PRF_EVAL_FIRST, ) session_id = secrets.token_urlsafe(16) _pending[session_id] = pending log.debug( "register/start issued session=%s rp.id=%s challenge_len=%s ext=%s", session_id, options.get("rp", {}).get("id"), len(options.get("challenge", "")), list(options.get("extensions", {}).keys()), ) return {"session_id": session_id, "options": options} @app.post("/register/finish") def register_finish(req: RegisterFinishRequest) -> dict[str, str]: log.debug("register/finish username=%r session=%s", req.username, req.session_id) pending = _pending.pop(req.session_id, None) if pending is None: log.warning("register/finish unknown session %s", req.session_id) raise HTTPException(status_code=400, detail="unknown session") try: stored = verify_registration(req.credential, pending) except Exception as exc: log.exception("register/finish verification failed: %s", exc) raise HTTPException(status_code=400, detail=f"verify failed: {exc}") from exc _credentials[req.username] = stored log.debug("register/finish ok cred_id=%s", stored.credential_id.hex()[:16]) return {"status": "registered", "username": req.username} @app.post("/auth/start") def auth_start(req: AuthStartRequest) -> dict[str, Any]: log.debug("auth/start username=%r", req.username) stored = _credentials.get(req.username) if stored is None: log.warning("auth/start unknown user %r", req.username) raise HTTPException(status_code=404, detail="unknown user") options, pending = new_authentication_options(stored, PRF_EVAL_FIRST) session_id = secrets.token_urlsafe(16) _pending[session_id] = pending log.debug( "auth/start issued session=%s rpId=%s allowCreds=%s", session_id, options.get("rpId"), len(options.get("allowCredentials", []) or []), ) return {"session_id": session_id, "options": options} @app.post("/auth/finish", response_model=NostrIdentityResponse) def auth_finish(req: AuthFinishRequest) -> NostrIdentityResponse: log.debug("auth/finish username=%r session=%s salt=%r", req.username, req.session_id, req.salt) pending = _pending.pop(req.session_id, None) if pending is None: log.warning("auth/finish unknown session %s", req.session_id) raise HTTPException(status_code=400, detail="unknown session") stored = _credentials.get(req.username) if stored is None: log.warning("auth/finish unknown user %r", req.username) raise HTTPException(status_code=404, detail="unknown user") try: new_count = verify_authentication(req.credential, pending, stored) except Exception as exc: log.exception("auth/finish assertion verification failed: %s", exc) raise HTTPException(status_code=400, detail=f"verify failed: {exc}") from exc stored.sign_count = new_count log.debug("auth/finish signature ok new_sign_count=%s", new_count) try: prf_output = extract_prf_output(req.credential) except KeyError as exc: log.warning("auth/finish missing PRF output: %s", exc) raise HTTPException(status_code=400, detail=str(exc)) from exc log.debug("auth/finish prf_output=%s bytes", len(prf_output)) # Prefer an explicitly-supplied salt; fall back to whatever the user # last used. Remember the choice so future calls without a salt # stay consistent. This is the *only* state we persist beyond the # current ceremony. salt = req.salt if req.salt is not None else _salts.get(req.username) if salt is not None: _salts[req.username] = salt identity = derive_nostr_key(prf_output, salt=salt) log.info("auth/finish derived npub=%s", identity.npub) return NostrIdentityResponse( npub=identity.npub, nsec=identity.nsec, privkey_hex=identity.privkey_hex, pubkey_hex=identity.pubkey_hex, ) # Static frontend. Mounted last so the POST API routes defined above take # precedence — StaticFiles only serves GET, so there is no collision. _WEB_DIR = Path(__file__).resolve().parent.parent / "web" if _WEB_DIR.is_dir(): app.mount("/", StaticFiles(directory=_WEB_DIR, html=True), name="web")