Ajout d'un panneau dev sous le login (Alice=membre, Bob=forgeron, Charlie=comite tech, Dave=observateur) pour tester les differents roles sans keypair Ed25519. Endpoint GET /auth/dev/profiles renvoie les profils uniquement en ENVIRONMENT=development. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
120 lines
4.1 KiB
Python
120 lines
4.1 KiB
Python
"""Authentication service: challenge generation, token management, current user resolution."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import secrets
|
|
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.config import settings
|
|
from app.database import get_db
|
|
from app.models.user import DuniterIdentity, Session
|
|
|
|
security = HTTPBearer(auto_error=False)
|
|
|
|
|
|
def _hash_token(token: str) -> str:
|
|
"""SHA-256 hash of a bearer token for storage."""
|
|
return hashlib.sha256(token.encode()).hexdigest()
|
|
|
|
|
|
async def create_session(db: AsyncSession, identity: DuniterIdentity) -> str:
|
|
"""Create a new session for the given identity, return the raw bearer token."""
|
|
raw_token = secrets.token_urlsafe(48)
|
|
token_hash = _hash_token(raw_token)
|
|
|
|
session = Session(
|
|
token_hash=token_hash,
|
|
identity_id=identity.id,
|
|
expires_at=datetime.now(timezone.utc) + timedelta(hours=settings.TOKEN_EXPIRE_HOURS),
|
|
)
|
|
db.add(session)
|
|
await db.commit()
|
|
|
|
return raw_token
|
|
|
|
|
|
async def invalidate_session(db: AsyncSession, token: str) -> None:
|
|
"""Delete the session matching the given raw token."""
|
|
token_hash = _hash_token(token)
|
|
result = await db.execute(select(Session).where(Session.token_hash == token_hash))
|
|
session = result.scalar_one_or_none()
|
|
if session:
|
|
await db.delete(session)
|
|
await db.commit()
|
|
|
|
|
|
async def get_current_identity(
|
|
db: AsyncSession = Depends(get_db),
|
|
credentials: HTTPAuthorizationCredentials | None = Depends(security),
|
|
) -> DuniterIdentity:
|
|
"""Dependency: resolve the current authenticated identity from the bearer token.
|
|
|
|
Raises 401 if the token is missing, invalid, or expired.
|
|
"""
|
|
if credentials is None:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentification requise")
|
|
|
|
token_hash = _hash_token(credentials.credentials)
|
|
result = await db.execute(
|
|
select(Session).where(
|
|
Session.token_hash == token_hash,
|
|
Session.expires_at > datetime.now(timezone.utc),
|
|
)
|
|
)
|
|
session = result.scalar_one_or_none()
|
|
|
|
if session is None:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token invalide ou expire")
|
|
|
|
result = await db.execute(select(DuniterIdentity).where(DuniterIdentity.id == session.identity_id))
|
|
identity = result.scalar_one_or_none()
|
|
|
|
if identity is None:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Identite introuvable")
|
|
|
|
return identity
|
|
|
|
|
|
async def get_or_create_identity(
|
|
db: AsyncSession,
|
|
address: str,
|
|
dev_profile: dict | None = None,
|
|
) -> DuniterIdentity:
|
|
"""Get an existing identity by address or create a new one.
|
|
|
|
If dev_profile is provided, apply the profile attributes on create or update.
|
|
"""
|
|
result = await db.execute(select(DuniterIdentity).where(DuniterIdentity.address == address))
|
|
identity = result.scalar_one_or_none()
|
|
|
|
if identity is None:
|
|
kwargs: dict = {"address": address}
|
|
if dev_profile:
|
|
kwargs.update({
|
|
"display_name": dev_profile.get("display_name"),
|
|
"wot_status": dev_profile.get("wot_status", "unknown"),
|
|
"is_smith": dev_profile.get("is_smith", False),
|
|
"is_techcomm": dev_profile.get("is_techcomm", False),
|
|
})
|
|
identity = DuniterIdentity(**kwargs)
|
|
db.add(identity)
|
|
await db.commit()
|
|
await db.refresh(identity)
|
|
elif dev_profile:
|
|
# Update existing identity with dev profile data
|
|
identity.display_name = dev_profile.get("display_name", identity.display_name)
|
|
identity.wot_status = dev_profile.get("wot_status", identity.wot_status)
|
|
identity.is_smith = dev_profile.get("is_smith", identity.is_smith)
|
|
identity.is_techcomm = dev_profile.get("is_techcomm", identity.is_techcomm)
|
|
await db.commit()
|
|
await db.refresh(identity)
|
|
|
|
return identity
|