"""Auth router: Ed25519 challenge-response authentication for Duniter V2 identities.""" from __future__ import annotations import secrets from datetime import datetime, timedelta, timezone from fastapi import APIRouter, Depends, HTTPException, Response, status from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings from app.database import get_db from app.models.user import DuniterIdentity from app.schemas.auth import ( ChallengeRequest, ChallengeResponse, IdentityOut, TokenResponse, VerifyRequest, ) from app.services.auth_service import ( create_session, get_current_identity, get_or_create_identity, invalidate_session, ) router = APIRouter() # ── Dev profiles (only available when ENVIRONMENT == "development") ───────── DEV_PROFILES = [ { "address": "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY", "display_name": "Alice (Membre WoT)", "wot_status": "member", "is_smith": False, "is_techcomm": False, }, { "address": "5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty", "display_name": "Bob (Forgeron)", "wot_status": "member", "is_smith": True, "is_techcomm": False, }, { "address": "5FLSigC9HGRKVhB9FiEo4Y3koPsNmBmkP7j4bJa3zN7d8tY", "display_name": "Charlie (Comite Tech)", "wot_status": "member", "is_smith": True, "is_techcomm": True, }, { "address": "5DAAnrj7VHTznn2AWBemMuyBwZWs6FNFjdyVXUeYum3PTXFy", "display_name": "Dave (Observateur)", "wot_status": "unknown", "is_smith": False, "is_techcomm": False, }, ] # ── In-memory challenge store (short-lived, no persistence needed) ────────── # Structure: { address: { "challenge": str, "expires_at": datetime } } _pending_challenges: dict[str, dict] = {} def _cleanup_expired_challenges() -> None: """Remove expired challenges from the in-memory store.""" now = datetime.now(timezone.utc) expired = [addr for addr, data in _pending_challenges.items() if data["expires_at"] < now] for addr in expired: del _pending_challenges[addr] # ── Routes ────────────────────────────────────────────────────────────────── @router.post("/challenge", response_model=ChallengeResponse) async def request_challenge(payload: ChallengeRequest) -> ChallengeResponse: """Generate a random Ed25519 challenge for the given Duniter address. The client must sign this challenge with the private key corresponding to the address, then submit it via POST /verify. """ _cleanup_expired_challenges() challenge = secrets.token_hex(32) expires_at = datetime.now(timezone.utc) + timedelta(seconds=settings.CHALLENGE_EXPIRE_SECONDS) _pending_challenges[payload.address] = { "challenge": challenge, "expires_at": expires_at, } return ChallengeResponse(challenge=challenge, expires_at=expires_at) @router.post("/verify", response_model=TokenResponse) async def verify_challenge( payload: VerifyRequest, db: AsyncSession = Depends(get_db), ) -> TokenResponse: """Verify the Ed25519 signature of a challenge and return a session token. Steps: 1. Check that a pending challenge exists for the address. 2. Verify the challenge string matches. 3. Verify the Ed25519 signature against the address public key. 4. Create or retrieve the DuniterIdentity. 5. Create a session and return the bearer token. """ # 1. Retrieve pending challenge pending = _pending_challenges.get(payload.address) if pending is None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Aucun challenge en attente pour cette adresse", ) # 2. Check expiry if pending["expires_at"] < datetime.now(timezone.utc): del _pending_challenges[payload.address] raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Challenge expire, veuillez en demander un nouveau", ) # 3. Verify challenge string matches if pending["challenge"] != payload.challenge: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Challenge invalide", ) # 4. Verify Ed25519 signature # TODO: Implement actual Ed25519 verification using substrate-interface # For now we accept any signature to allow development/testing. # In production this MUST verify: verify(address_pubkey, challenge_bytes, signature_bytes) # # from substrateinterface import Keypair # keypair = Keypair(ss58_address=payload.address) # if not keypair.verify(payload.challenge.encode(), bytes.fromhex(payload.signature)): # raise HTTPException(status_code=401, detail="Signature invalide") # 5. Consume the challenge del _pending_challenges[payload.address] # 6. Get or create identity (apply dev profile if available) dev_profile = None if settings.ENVIRONMENT == "development": dev_profile = next((p for p in DEV_PROFILES if p["address"] == payload.address), None) identity = await get_or_create_identity(db, payload.address, dev_profile=dev_profile) # 7. Create session token token = await create_session(db, identity) return TokenResponse( token=token, identity=IdentityOut.model_validate(identity), ) @router.get("/dev/profiles") async def list_dev_profiles(): """List available dev profiles for quick login. Only available in development.""" if settings.ENVIRONMENT != "development": raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Not available") return DEV_PROFILES @router.get("/me", response_model=IdentityOut) async def get_me( identity: DuniterIdentity = Depends(get_current_identity), ) -> IdentityOut: """Return the currently authenticated identity.""" return IdentityOut.model_validate(identity) @router.post("/logout", status_code=status.HTTP_204_NO_CONTENT, response_class=Response, response_model=None) async def logout( db: AsyncSession = Depends(get_db), identity: DuniterIdentity = Depends(get_current_identity), ): """Invalidate the current session token. Note: get_current_identity already validated the token, so we know it exists. We re-extract it from the Authorization header to invalidate it. """ # We need the raw token to invalidate -- re-extract from the dependency chain. # Since get_current_identity already validated, we know the request has a valid Bearer token. # We use a slightly different approach: delete all sessions for this identity # that match. For a cleaner approach, we accept the token via a dedicated dependency. from fastapi import Request # This is handled by getting the token from the auth service # For simplicity, we delete all sessions for the identity from sqlalchemy import select from app.models.user import Session result = await db.execute(select(Session).where(Session.identity_id == identity.id)) sessions = result.scalars().all() for session in sessions: await db.delete(session) await db.commit()