Files
decision/backend/app/services/auth_service.py
Yvv 11e4a4d60a Dev mode: panneau connexion rapide avec 4 profils pre-configures
Ajout d'un panneau dev sous le login (Alice=membre, Bob=forgeron,
Charlie=comite tech, Dave=observateur) pour tester les differents
roles sans keypair Ed25519. Endpoint GET /auth/dev/profiles renvoie
les profils uniquement en ENVIRONMENT=development.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-02 03:09:40 +01:00

120 lines
4.1 KiB
Python

"""Authentication service: challenge generation, token management, current user resolution."""
from __future__ import annotations
import hashlib
import secrets
import uuid
from datetime import datetime, timedelta, timezone
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database import get_db
from app.models.user import DuniterIdentity, Session
security = HTTPBearer(auto_error=False)
def _hash_token(token: str) -> str:
"""SHA-256 hash of a bearer token for storage."""
return hashlib.sha256(token.encode()).hexdigest()
async def create_session(db: AsyncSession, identity: DuniterIdentity) -> str:
"""Create a new session for the given identity, return the raw bearer token."""
raw_token = secrets.token_urlsafe(48)
token_hash = _hash_token(raw_token)
session = Session(
token_hash=token_hash,
identity_id=identity.id,
expires_at=datetime.now(timezone.utc) + timedelta(hours=settings.TOKEN_EXPIRE_HOURS),
)
db.add(session)
await db.commit()
return raw_token
async def invalidate_session(db: AsyncSession, token: str) -> None:
"""Delete the session matching the given raw token."""
token_hash = _hash_token(token)
result = await db.execute(select(Session).where(Session.token_hash == token_hash))
session = result.scalar_one_or_none()
if session:
await db.delete(session)
await db.commit()
async def get_current_identity(
db: AsyncSession = Depends(get_db),
credentials: HTTPAuthorizationCredentials | None = Depends(security),
) -> DuniterIdentity:
"""Dependency: resolve the current authenticated identity from the bearer token.
Raises 401 if the token is missing, invalid, or expired.
"""
if credentials is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentification requise")
token_hash = _hash_token(credentials.credentials)
result = await db.execute(
select(Session).where(
Session.token_hash == token_hash,
Session.expires_at > datetime.now(timezone.utc),
)
)
session = result.scalar_one_or_none()
if session is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token invalide ou expire")
result = await db.execute(select(DuniterIdentity).where(DuniterIdentity.id == session.identity_id))
identity = result.scalar_one_or_none()
if identity is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Identite introuvable")
return identity
async def get_or_create_identity(
db: AsyncSession,
address: str,
dev_profile: dict | None = None,
) -> DuniterIdentity:
"""Get an existing identity by address or create a new one.
If dev_profile is provided, apply the profile attributes on create or update.
"""
result = await db.execute(select(DuniterIdentity).where(DuniterIdentity.address == address))
identity = result.scalar_one_or_none()
if identity is None:
kwargs: dict = {"address": address}
if dev_profile:
kwargs.update({
"display_name": dev_profile.get("display_name"),
"wot_status": dev_profile.get("wot_status", "unknown"),
"is_smith": dev_profile.get("is_smith", False),
"is_techcomm": dev_profile.get("is_techcomm", False),
})
identity = DuniterIdentity(**kwargs)
db.add(identity)
await db.commit()
await db.refresh(identity)
elif dev_profile:
# Update existing identity with dev profile data
identity.display_name = dev_profile.get("display_name", identity.display_name)
identity.wot_status = dev_profile.get("wot_status", identity.wot_status)
identity.is_smith = dev_profile.get("is_smith", identity.is_smith)
identity.is_techcomm = dev_profile.get("is_techcomm", identity.is_techcomm)
await db.commit()
await db.refresh(identity)
return identity