Files
decision/backend/app/routers/auth.py
Yvv 11e4a4d60a Dev mode: panneau connexion rapide avec 4 profils pre-configures
Ajout d'un panneau dev sous le login (Alice=membre, Bob=forgeron,
Charlie=comite tech, Dave=observateur) pour tester les differents
roles sans keypair Ed25519. Endpoint GET /auth/dev/profiles renvoie
les profils uniquement en ENVIRONMENT=development.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-02 03:09:40 +01:00

206 lines
7.3 KiB
Python

"""Auth router: Ed25519 challenge-response authentication for Duniter V2 identities."""
from __future__ import annotations
import secrets
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, Depends, HTTPException, Response, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database import get_db
from app.models.user import DuniterIdentity
from app.schemas.auth import (
ChallengeRequest,
ChallengeResponse,
IdentityOut,
TokenResponse,
VerifyRequest,
)
from app.services.auth_service import (
create_session,
get_current_identity,
get_or_create_identity,
invalidate_session,
)
router = APIRouter()
# ── Dev profiles (only available when ENVIRONMENT == "development") ─────────
DEV_PROFILES = [
{
"address": "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY",
"display_name": "Alice (Membre WoT)",
"wot_status": "member",
"is_smith": False,
"is_techcomm": False,
},
{
"address": "5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty",
"display_name": "Bob (Forgeron)",
"wot_status": "member",
"is_smith": True,
"is_techcomm": False,
},
{
"address": "5FLSigC9HGRKVhB9FiEo4Y3koPsNmBmkP7j4bJa3zN7d8tY",
"display_name": "Charlie (Comite Tech)",
"wot_status": "member",
"is_smith": True,
"is_techcomm": True,
},
{
"address": "5DAAnrj7VHTznn2AWBemMuyBwZWs6FNFjdyVXUeYum3PTXFy",
"display_name": "Dave (Observateur)",
"wot_status": "unknown",
"is_smith": False,
"is_techcomm": False,
},
]
# ── In-memory challenge store (short-lived, no persistence needed) ──────────
# Structure: { address: { "challenge": str, "expires_at": datetime } }
_pending_challenges: dict[str, dict] = {}
def _cleanup_expired_challenges() -> None:
"""Remove expired challenges from the in-memory store."""
now = datetime.now(timezone.utc)
expired = [addr for addr, data in _pending_challenges.items() if data["expires_at"] < now]
for addr in expired:
del _pending_challenges[addr]
# ── Routes ──────────────────────────────────────────────────────────────────
@router.post("/challenge", response_model=ChallengeResponse)
async def request_challenge(payload: ChallengeRequest) -> ChallengeResponse:
"""Generate a random Ed25519 challenge for the given Duniter address.
The client must sign this challenge with the private key corresponding
to the address, then submit it via POST /verify.
"""
_cleanup_expired_challenges()
challenge = secrets.token_hex(32)
expires_at = datetime.now(timezone.utc) + timedelta(seconds=settings.CHALLENGE_EXPIRE_SECONDS)
_pending_challenges[payload.address] = {
"challenge": challenge,
"expires_at": expires_at,
}
return ChallengeResponse(challenge=challenge, expires_at=expires_at)
@router.post("/verify", response_model=TokenResponse)
async def verify_challenge(
payload: VerifyRequest,
db: AsyncSession = Depends(get_db),
) -> TokenResponse:
"""Verify the Ed25519 signature of a challenge and return a session token.
Steps:
1. Check that a pending challenge exists for the address.
2. Verify the challenge string matches.
3. Verify the Ed25519 signature against the address public key.
4. Create or retrieve the DuniterIdentity.
5. Create a session and return the bearer token.
"""
# 1. Retrieve pending challenge
pending = _pending_challenges.get(payload.address)
if pending is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Aucun challenge en attente pour cette adresse",
)
# 2. Check expiry
if pending["expires_at"] < datetime.now(timezone.utc):
del _pending_challenges[payload.address]
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Challenge expire, veuillez en demander un nouveau",
)
# 3. Verify challenge string matches
if pending["challenge"] != payload.challenge:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Challenge invalide",
)
# 4. Verify Ed25519 signature
# TODO: Implement actual Ed25519 verification using substrate-interface
# For now we accept any signature to allow development/testing.
# In production this MUST verify: verify(address_pubkey, challenge_bytes, signature_bytes)
#
# from substrateinterface import Keypair
# keypair = Keypair(ss58_address=payload.address)
# if not keypair.verify(payload.challenge.encode(), bytes.fromhex(payload.signature)):
# raise HTTPException(status_code=401, detail="Signature invalide")
# 5. Consume the challenge
del _pending_challenges[payload.address]
# 6. Get or create identity (apply dev profile if available)
dev_profile = None
if settings.ENVIRONMENT == "development":
dev_profile = next((p for p in DEV_PROFILES if p["address"] == payload.address), None)
identity = await get_or_create_identity(db, payload.address, dev_profile=dev_profile)
# 7. Create session token
token = await create_session(db, identity)
return TokenResponse(
token=token,
identity=IdentityOut.model_validate(identity),
)
@router.get("/dev/profiles")
async def list_dev_profiles():
"""List available dev profiles for quick login. Only available in development."""
if settings.ENVIRONMENT != "development":
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Not available")
return DEV_PROFILES
@router.get("/me", response_model=IdentityOut)
async def get_me(
identity: DuniterIdentity = Depends(get_current_identity),
) -> IdentityOut:
"""Return the currently authenticated identity."""
return IdentityOut.model_validate(identity)
@router.post("/logout", status_code=status.HTTP_204_NO_CONTENT, response_class=Response, response_model=None)
async def logout(
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
):
"""Invalidate the current session token.
Note: get_current_identity already validated the token, so we know it exists.
We re-extract it from the Authorization header to invalidate it.
"""
# We need the raw token to invalidate -- re-extract from the dependency chain.
# Since get_current_identity already validated, we know the request has a valid Bearer token.
# We use a slightly different approach: delete all sessions for this identity
# that match. For a cleaner approach, we accept the token via a dedicated dependency.
from fastapi import Request
# This is handled by getting the token from the auth service
# For simplicity, we delete all sessions for the identity
from sqlalchemy import select
from app.models.user import Session
result = await db.execute(select(Session).where(Session.identity_id == identity.id))
sessions = result.scalars().all()
for session in sessions:
await db.delete(session)
await db.commit()