"""Auth router: Ed25519 challenge-response authentication for Duniter V2 identities."""
from __future__ import annotations
import secrets
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, Depends, HTTPException, Query, Response, status
from sqlalchemy import or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database import get_db
from app.models.user import DuniterIdentity
from app.schemas.auth import (
ChallengeRequest,
ChallengeResponse,
IdentityOut,
TokenResponse,
VerifyRequest,
)
from app.services.auth_service import (
create_session,
get_current_identity,
get_or_create_identity,
invalidate_session,
)
router = APIRouter()
# ── Dev profiles (only available when ENVIRONMENT == "development") ─────────
DEV_PROFILES = [
{
"address": "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY",
"display_name": "Alice (Membre WoT)",
"wot_status": "member",
"is_smith": False,
"is_techcomm": False,
},
{
"address": "5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty",
"display_name": "Bob (Forgeron)",
"wot_status": "member",
"is_smith": True,
"is_techcomm": False,
},
{
"address": "5FLSigC9HGRKVhB9FiEo4Y3koPsNmBmkP7j4bJa3zN7d8tY",
"display_name": "Charlie (Référent structure)",
"wot_status": "member",
"is_smith": True,
"is_techcomm": True,
},
{
"address": "5DAAnrj7VHTznn2AWBemMuyBwZWs6FNFjdyVXUeYum3PTXFy",
"display_name": "Dave (Auteur)",
"wot_status": "member",
"is_smith": False,
"is_techcomm": False,
},
]
# ── In-memory challenge store (short-lived, no persistence needed) ──────────
# Structure: { address: { "challenge": str, "expires_at": datetime } }
_pending_challenges: dict[str, dict] = {}
def _cleanup_expired_challenges() -> None:
"""Remove expired challenges from the in-memory store."""
now = datetime.now(timezone.utc)
expired = [addr for addr, data in _pending_challenges.items() if data["expires_at"] < now]
for addr in expired:
del _pending_challenges[addr]
# ── Routes ──────────────────────────────────────────────────────────────────
@router.post("/challenge", response_model=ChallengeResponse)
async def request_challenge(payload: ChallengeRequest) -> ChallengeResponse:
"""Generate a random Ed25519 challenge for the given Duniter address.
The client must sign this challenge with the private key corresponding
to the address, then submit it via POST /verify.
"""
_cleanup_expired_challenges()
challenge = secrets.token_hex(32)
expires_at = datetime.now(timezone.utc) + timedelta(seconds=settings.CHALLENGE_EXPIRE_SECONDS)
_pending_challenges[payload.address] = {
"challenge": challenge,
"expires_at": expires_at,
}
return ChallengeResponse(challenge=challenge, expires_at=expires_at)
@router.post("/verify", response_model=TokenResponse)
async def verify_challenge(
payload: VerifyRequest,
db: AsyncSession = Depends(get_db),
) -> TokenResponse:
"""Verify the Ed25519 signature of a challenge and return a session token.
Steps:
1. Check that a pending challenge exists for the address.
2. Verify the challenge string matches.
3. Verify the Ed25519 signature against the address public key.
4. Create or retrieve the DuniterIdentity.
5. Create a session and return the bearer token.
"""
# 1. Retrieve pending challenge
pending = _pending_challenges.get(payload.address)
if pending is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Aucun challenge en attente pour cette adresse",
)
# 2. Check expiry
if pending["expires_at"] < datetime.now(timezone.utc):
del _pending_challenges[payload.address]
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Challenge expire, veuillez en demander un nouveau",
)
# 3. Verify challenge string matches
if pending["challenge"] != payload.challenge:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Challenge invalide",
)
# 4. Verify signature
# TODO: trustWallet — déléguer la vérification au protocole trustWallet (librodrome)
# Quand trustWallet sera disponible : remplacer le bloc ci-dessous par une vérification
# du token signé fourni par trustWallet (JWT ou preuve Ed25519 via iframe postMessage).
# Le bypass DEMO_MODE sera alors supprimé.
_demo_addresses = {p["address"] for p in DEV_PROFILES}
is_demo_bypass = (settings.DEMO_MODE or settings.ENVIRONMENT == "development") and payload.address in _demo_addresses
if not is_demo_bypass:
# polkadot.js / Cesium2 signRaw(type='bytes') wraps: {challenge}
message = f"{payload.challenge}".encode("utf-8")
sig_hex = payload.signature.removeprefix("0x")
try:
sig_bytes = bytes.fromhex(sig_hex)
except ValueError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Format de signature invalide (hex attendu)",
)
from substrateinterface import Keypair, KeypairType
verified = False
# Try Sr25519 first (default Substrate/Cesium2), then Ed25519 (Duniter v1 migration)
for key_type in [KeypairType.SR25519, KeypairType.ED25519]:
try:
kp = Keypair(ss58_address=payload.address, crypto_type=key_type)
if kp.verify(message, sig_bytes):
verified = True
break
except Exception:
continue
if not verified:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Signature invalide",
)
# 5. Consume the challenge
del _pending_challenges[payload.address]
# 6. Get or create identity (apply dev profile if available)
dev_profile = None
if settings.ENVIRONMENT == "development" or settings.DEMO_MODE:
dev_profile = next((p for p in DEV_PROFILES if p["address"] == payload.address), None)
identity = await get_or_create_identity(db, payload.address, dev_profile=dev_profile)
# 7. Create session token
token = await create_session(db, identity)
return TokenResponse(
token=token,
identity=IdentityOut.model_validate(identity),
)
@router.get("/dev/profiles")
async def list_dev_profiles():
"""List available demo profiles for quick login. Available in development or demo mode."""
if settings.ENVIRONMENT != "development" and not settings.DEMO_MODE:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Not available")
return DEV_PROFILES
@router.get("/me", response_model=IdentityOut)
async def get_me(
identity: DuniterIdentity = Depends(get_current_identity),
) -> IdentityOut:
"""Return the currently authenticated identity."""
return IdentityOut.model_validate(identity)
@router.post("/logout", status_code=status.HTTP_204_NO_CONTENT, response_class=Response, response_model=None)
async def logout(
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
):
"""Invalidate the current session token.
Note: get_current_identity already validated the token, so we know it exists.
We re-extract it from the Authorization header to invalidate it.
"""
# We need the raw token to invalidate -- re-extract from the dependency chain.
# Since get_current_identity already validated, we know the request has a valid Bearer token.
# We use a slightly different approach: delete all sessions for this identity
# that match. For a cleaner approach, we accept the token via a dedicated dependency.
from fastapi import Request
# This is handled by getting the token from the auth service
# For simplicity, we delete all sessions for the identity
from sqlalchemy import select
from app.models.user import Session
result = await db.execute(select(Session).where(Session.identity_id == identity.id))
sessions = result.scalars().all()
for session in sessions:
await db.delete(session)
await db.commit()
@router.get("/identities", response_model=list[IdentityOut])
async def search_identities(
q: str = Query(..., min_length=1, description="Recherche par adresse ou nom"),
limit: int = Query(default=10, ge=1, le=50),
db: AsyncSession = Depends(get_db),
) -> list[IdentityOut]:
"""Search Duniter identities by address prefix or display_name."""
result = await db.execute(
select(DuniterIdentity)
.where(
or_(
DuniterIdentity.address.ilike(f"{q}%"),
DuniterIdentity.display_name.ilike(f"%{q}%"),
)
)
.order_by(DuniterIdentity.display_name)
.limit(limit)
)
return [IdentityOut.model_validate(i) for i in result.scalars().all()]