Files
sejeteralo/backend/app/services/auth_service.py
Yvv b30e54a8f7 Initial commit: SejeteralO water tarification platform
Full-stack app for participatory water pricing using Bezier curves.
- Backend: FastAPI + SQLAlchemy + SQLite with JWT auth
- Frontend: Nuxt 4 + TypeScript with interactive SVG editor
- Math engine: cubic Bezier tarification with Cardano solver
- Admin: commune management, household import, vote monitoring, CMS
- Citizen: interactive curve editor, vote submission
- Docker-compose deployment ready

Includes fixes for:
- Impact table snake_case/camelCase property mismatch
- CMS content backend API + frontend editor (was stub)
- Admin route protection middleware
- Public content display on commune page
- Vote confirmation page link fix

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 15:26:02 +01:00

90 lines
2.9 KiB
Python

"""Authentication service: JWT creation/validation, password hashing."""
from datetime import datetime, timedelta
from jose import jwt, JWTError
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.config import settings
from app.database import get_db
from app.models import AdminUser, Household
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
security = HTTPBearer()
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_token(data: dict, expires_hours: int) -> str:
to_encode = data.copy()
to_encode["exp"] = datetime.utcnow() + timedelta(hours=expires_hours)
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
def create_admin_token(admin: AdminUser) -> str:
return create_token(
{"sub": str(admin.id), "role": admin.role, "type": "admin"},
settings.ADMIN_TOKEN_EXPIRE_HOURS,
)
def create_citizen_token(household: Household, commune_slug: str) -> str:
return create_token(
{
"sub": str(household.id),
"commune_id": household.commune_id,
"commune_slug": commune_slug,
"type": "citizen",
},
settings.CITIZEN_TOKEN_EXPIRE_HOURS,
)
def decode_token(token: str) -> dict:
try:
return jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
except JWTError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
async def get_current_admin(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncSession = Depends(get_db),
) -> AdminUser:
payload = decode_token(credentials.credentials)
if payload.get("type") != "admin":
raise HTTPException(status_code=403, detail="Admin access required")
admin = await db.get(AdminUser, int(payload["sub"]))
if not admin:
raise HTTPException(status_code=401, detail="Admin not found")
return admin
async def get_current_citizen(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncSession = Depends(get_db),
) -> Household:
payload = decode_token(credentials.credentials)
if payload.get("type") != "citizen":
raise HTTPException(status_code=403, detail="Citizen access required")
household = await db.get(Household, int(payload["sub"]))
if not household:
raise HTTPException(status_code=401, detail="Household not found")
return household
def require_super_admin(admin: AdminUser = Depends(get_current_admin)) -> AdminUser:
if admin.role != "super_admin":
raise HTTPException(status_code=403, detail="Super admin access required")
return admin