Files
decision/backend/app/routers/auth.py
Yvv f87cbc0f2f Typo Plus Jakarta Sans + renommage libreDecision + mode démo prod + seed mandats
- Fonte : Nunito → Plus Jakarta Sans (moderne, ronde sans être toy)
- Logo : ğ(Decision) → libreDecision (libre italic/muted + Decision bold)
- Footer et watermark DocumentPreview mis à jour
- Mode démo : DEMO_MODE flag dans config.py + auth.py (profils rapides en prod)
- docker-compose : ENVIRONMENT=production explicite + DEMO_MODE=true par défaut
- Seed : +décision Licence G1 v0.4.0, +3 mandats (ComTech, Admin Forgerons, Modération)
  runner 7→10 étapes, import Mandate/MandateStep ajouté

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-24 01:22:36 +01:00

206 lines
7.3 KiB
Python

"""Auth router: Ed25519 challenge-response authentication for Duniter V2 identities."""
from __future__ import annotations
import secrets
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, Depends, HTTPException, Response, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database import get_db
from app.models.user import DuniterIdentity
from app.schemas.auth import (
ChallengeRequest,
ChallengeResponse,
IdentityOut,
TokenResponse,
VerifyRequest,
)
from app.services.auth_service import (
create_session,
get_current_identity,
get_or_create_identity,
invalidate_session,
)
router = APIRouter()
# ── Dev profiles (only available when ENVIRONMENT == "development") ─────────
DEV_PROFILES = [
{
"address": "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY",
"display_name": "Alice (Membre WoT)",
"wot_status": "member",
"is_smith": False,
"is_techcomm": False,
},
{
"address": "5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty",
"display_name": "Bob (Forgeron)",
"wot_status": "member",
"is_smith": True,
"is_techcomm": False,
},
{
"address": "5FLSigC9HGRKVhB9FiEo4Y3koPsNmBmkP7j4bJa3zN7d8tY",
"display_name": "Charlie (Comite Tech)",
"wot_status": "member",
"is_smith": True,
"is_techcomm": True,
},
{
"address": "5DAAnrj7VHTznn2AWBemMuyBwZWs6FNFjdyVXUeYum3PTXFy",
"display_name": "Dave (Observateur)",
"wot_status": "unknown",
"is_smith": False,
"is_techcomm": False,
},
]
# ── In-memory challenge store (short-lived, no persistence needed) ──────────
# Structure: { address: { "challenge": str, "expires_at": datetime } }
_pending_challenges: dict[str, dict] = {}
def _cleanup_expired_challenges() -> None:
"""Remove expired challenges from the in-memory store."""
now = datetime.now(timezone.utc)
expired = [addr for addr, data in _pending_challenges.items() if data["expires_at"] < now]
for addr in expired:
del _pending_challenges[addr]
# ── Routes ──────────────────────────────────────────────────────────────────
@router.post("/challenge", response_model=ChallengeResponse)
async def request_challenge(payload: ChallengeRequest) -> ChallengeResponse:
"""Generate a random Ed25519 challenge for the given Duniter address.
The client must sign this challenge with the private key corresponding
to the address, then submit it via POST /verify.
"""
_cleanup_expired_challenges()
challenge = secrets.token_hex(32)
expires_at = datetime.now(timezone.utc) + timedelta(seconds=settings.CHALLENGE_EXPIRE_SECONDS)
_pending_challenges[payload.address] = {
"challenge": challenge,
"expires_at": expires_at,
}
return ChallengeResponse(challenge=challenge, expires_at=expires_at)
@router.post("/verify", response_model=TokenResponse)
async def verify_challenge(
payload: VerifyRequest,
db: AsyncSession = Depends(get_db),
) -> TokenResponse:
"""Verify the Ed25519 signature of a challenge and return a session token.
Steps:
1. Check that a pending challenge exists for the address.
2. Verify the challenge string matches.
3. Verify the Ed25519 signature against the address public key.
4. Create or retrieve the DuniterIdentity.
5. Create a session and return the bearer token.
"""
# 1. Retrieve pending challenge
pending = _pending_challenges.get(payload.address)
if pending is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Aucun challenge en attente pour cette adresse",
)
# 2. Check expiry
if pending["expires_at"] < datetime.now(timezone.utc):
del _pending_challenges[payload.address]
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Challenge expire, veuillez en demander un nouveau",
)
# 3. Verify challenge string matches
if pending["challenge"] != payload.challenge:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Challenge invalide",
)
# 4. Verify Ed25519 signature
# TODO: Implement actual Ed25519 verification using substrate-interface
# For now we accept any signature to allow development/testing.
# In production this MUST verify: verify(address_pubkey, challenge_bytes, signature_bytes)
#
# from substrateinterface import Keypair
# keypair = Keypair(ss58_address=payload.address)
# if not keypair.verify(payload.challenge.encode(), bytes.fromhex(payload.signature)):
# raise HTTPException(status_code=401, detail="Signature invalide")
# 5. Consume the challenge
del _pending_challenges[payload.address]
# 6. Get or create identity (apply dev profile if available)
dev_profile = None
if settings.ENVIRONMENT == "development" or settings.DEMO_MODE:
dev_profile = next((p for p in DEV_PROFILES if p["address"] == payload.address), None)
identity = await get_or_create_identity(db, payload.address, dev_profile=dev_profile)
# 7. Create session token
token = await create_session(db, identity)
return TokenResponse(
token=token,
identity=IdentityOut.model_validate(identity),
)
@router.get("/dev/profiles")
async def list_dev_profiles():
"""List available demo profiles for quick login. Available in development or demo mode."""
if settings.ENVIRONMENT != "development" and not settings.DEMO_MODE:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Not available")
return DEV_PROFILES
@router.get("/me", response_model=IdentityOut)
async def get_me(
identity: DuniterIdentity = Depends(get_current_identity),
) -> IdentityOut:
"""Return the currently authenticated identity."""
return IdentityOut.model_validate(identity)
@router.post("/logout", status_code=status.HTTP_204_NO_CONTENT, response_class=Response, response_model=None)
async def logout(
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
):
"""Invalidate the current session token.
Note: get_current_identity already validated the token, so we know it exists.
We re-extract it from the Authorization header to invalidate it.
"""
# We need the raw token to invalidate -- re-extract from the dependency chain.
# Since get_current_identity already validated, we know the request has a valid Bearer token.
# We use a slightly different approach: delete all sessions for this identity
# that match. For a cleaner approach, we accept the token via a dedicated dependency.
from fastapi import Request
# This is handled by getting the token from the auth service
# For simplicity, we delete all sessions for the identity
from sqlalchemy import select
from app.models.user import Session
result = await db.execute(select(Session).where(Session.identity_id == identity.id))
sessions = result.scalars().all()
for session in sessions:
await db.delete(session)
await db.commit()