"""Auth router: Ed25519 challenge-response authentication for Duniter V2 identities.""" from __future__ import annotations import secrets from datetime import datetime, timedelta, timezone from fastapi import APIRouter, Depends, HTTPException, Query, Response, status from sqlalchemy import or_, select from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings from app.database import get_db from app.models.user import DuniterIdentity from app.schemas.auth import ( ChallengeRequest, ChallengeResponse, IdentityOut, TokenResponse, VerifyRequest, ) from app.services.auth_service import ( create_session, get_current_identity, get_or_create_identity, invalidate_session, ) router = APIRouter() # ── Dev profiles (only available when ENVIRONMENT == "development") ───────── DEV_PROFILES = [ { "address": "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY", "display_name": "Alice (Membre WoT)", "wot_status": "member", "is_smith": False, "is_techcomm": False, }, { "address": "5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty", "display_name": "Bob (Forgeron)", "wot_status": "member", "is_smith": True, "is_techcomm": False, }, { "address": "5FLSigC9HGRKVhB9FiEo4Y3koPsNmBmkP7j4bJa3zN7d8tY", "display_name": "Charlie (Référent structure)", "wot_status": "member", "is_smith": True, "is_techcomm": True, }, { "address": "5DAAnrj7VHTznn2AWBemMuyBwZWs6FNFjdyVXUeYum3PTXFy", "display_name": "Dave (Auteur)", "wot_status": "member", "is_smith": False, "is_techcomm": False, }, ] # ── In-memory challenge store (short-lived, no persistence needed) ────────── # Structure: { address: { "challenge": str, "expires_at": datetime } } _pending_challenges: dict[str, dict] = {} def _cleanup_expired_challenges() -> None: """Remove expired challenges from the in-memory store.""" now = datetime.now(timezone.utc) expired = [addr for addr, data in _pending_challenges.items() if data["expires_at"] < now] for addr in expired: del _pending_challenges[addr] # ── Routes ────────────────────────────────────────────────────────────────── @router.post("/challenge", response_model=ChallengeResponse) async def request_challenge(payload: ChallengeRequest) -> ChallengeResponse: """Generate a random Ed25519 challenge for the given Duniter address. The client must sign this challenge with the private key corresponding to the address, then submit it via POST /verify. """ _cleanup_expired_challenges() challenge = secrets.token_hex(32) expires_at = datetime.now(timezone.utc) + timedelta(seconds=settings.CHALLENGE_EXPIRE_SECONDS) _pending_challenges[payload.address] = { "challenge": challenge, "expires_at": expires_at, } return ChallengeResponse(challenge=challenge, expires_at=expires_at) @router.post("/verify", response_model=TokenResponse) async def verify_challenge( payload: VerifyRequest, db: AsyncSession = Depends(get_db), ) -> TokenResponse: """Verify the Ed25519 signature of a challenge and return a session token. Steps: 1. Check that a pending challenge exists for the address. 2. Verify the challenge string matches. 3. Verify the Ed25519 signature against the address public key. 4. Create or retrieve the DuniterIdentity. 5. Create a session and return the bearer token. """ # 1. Retrieve pending challenge pending = _pending_challenges.get(payload.address) if pending is None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Aucun challenge en attente pour cette adresse", ) # 2. Check expiry if pending["expires_at"] < datetime.now(timezone.utc): del _pending_challenges[payload.address] raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Challenge expire, veuillez en demander un nouveau", ) # 3. Verify challenge string matches if pending["challenge"] != payload.challenge: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Challenge invalide", ) # 4. Verify signature # TODO: trustWallet — déléguer la vérification au protocole trustWallet (librodrome) # Quand trustWallet sera disponible : remplacer le bloc ci-dessous par une vérification # du token signé fourni par trustWallet (JWT ou preuve Ed25519 via iframe postMessage). # Le bypass DEMO_MODE sera alors supprimé. _demo_addresses = {p["address"] for p in DEV_PROFILES} is_demo_bypass = (settings.DEMO_MODE or settings.ENVIRONMENT == "development") and payload.address in _demo_addresses if not is_demo_bypass: # polkadot.js / Cesium2 signRaw(type='bytes') wraps: {challenge} message = f"{payload.challenge}".encode("utf-8") sig_hex = payload.signature.removeprefix("0x") try: sig_bytes = bytes.fromhex(sig_hex) except ValueError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Format de signature invalide (hex attendu)", ) from substrateinterface import Keypair, KeypairType verified = False # Try Sr25519 first (default Substrate/Cesium2), then Ed25519 (Duniter v1 migration) for key_type in [KeypairType.SR25519, KeypairType.ED25519]: try: kp = Keypair(ss58_address=payload.address, crypto_type=key_type) if kp.verify(message, sig_bytes): verified = True break except Exception: continue if not verified: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Signature invalide", ) # 5. Consume the challenge del _pending_challenges[payload.address] # 6. Get or create identity (apply dev profile if available) dev_profile = None if settings.ENVIRONMENT == "development" or settings.DEMO_MODE: dev_profile = next((p for p in DEV_PROFILES if p["address"] == payload.address), None) identity = await get_or_create_identity(db, payload.address, dev_profile=dev_profile) # 7. Create session token token = await create_session(db, identity) return TokenResponse( token=token, identity=IdentityOut.model_validate(identity), ) @router.get("/dev/profiles") async def list_dev_profiles(): """List available demo profiles for quick login. Available in development or demo mode.""" if settings.ENVIRONMENT != "development" and not settings.DEMO_MODE: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Not available") return DEV_PROFILES @router.get("/me", response_model=IdentityOut) async def get_me( identity: DuniterIdentity = Depends(get_current_identity), ) -> IdentityOut: """Return the currently authenticated identity.""" return IdentityOut.model_validate(identity) @router.post("/logout", status_code=status.HTTP_204_NO_CONTENT, response_class=Response, response_model=None) async def logout( db: AsyncSession = Depends(get_db), identity: DuniterIdentity = Depends(get_current_identity), ): """Invalidate the current session token. Note: get_current_identity already validated the token, so we know it exists. We re-extract it from the Authorization header to invalidate it. """ # We need the raw token to invalidate -- re-extract from the dependency chain. # Since get_current_identity already validated, we know the request has a valid Bearer token. # We use a slightly different approach: delete all sessions for this identity # that match. For a cleaner approach, we accept the token via a dedicated dependency. from fastapi import Request # This is handled by getting the token from the auth service # For simplicity, we delete all sessions for the identity from sqlalchemy import select from app.models.user import Session result = await db.execute(select(Session).where(Session.identity_id == identity.id)) sessions = result.scalars().all() for session in sessions: await db.delete(session) await db.commit() @router.get("/identities", response_model=list[IdentityOut]) async def search_identities( q: str = Query(..., min_length=1, description="Recherche par adresse ou nom"), limit: int = Query(default=10, ge=1, le=50), db: AsyncSession = Depends(get_db), ) -> list[IdentityOut]: """Search Duniter identities by address prefix or display_name.""" result = await db.execute( select(DuniterIdentity) .where( or_( DuniterIdentity.address.ilike(f"{q}%"), DuniterIdentity.display_name.ilike(f"%{q}%"), ) ) .order_by(DuniterIdentity.display_name) .limit(limit) ) return [IdentityOut.model_validate(i) for i in result.scalars().all()]