Plateforme de decisions collectives pour Duniter/G1. Backend FastAPI async + PostgreSQL (14 tables, 8 routers, 6 services, moteur de vote avec formule d'inertie WoT/Smith/TechComm). Frontend Nuxt 4 + Nuxt UI v3 + Pinia (9 pages, 5 stores). Infrastructure Docker + Woodpecker CI + Traefik. Documentation technique et utilisateur (15 fichiers). Seed : Licence G1, Engagement Forgeron v2.0.0, 4 protocoles de vote. 30 tests unitaires (formules, mode params, vote nuance) -- tous verts. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
307 lines
10 KiB
Python
307 lines
10 KiB
Python
"""Votes router: vote sessions, individual votes, and result computation."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import math
|
|
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, status
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from app.database import get_db
|
|
from app.models.protocol import FormulaConfig, VotingProtocol
|
|
from app.models.user import DuniterIdentity
|
|
from app.models.vote import Vote, VoteSession
|
|
from app.schemas.vote import VoteCreate, VoteOut, VoteSessionCreate, VoteSessionOut
|
|
from app.services.auth_service import get_current_identity
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
# ── Helpers ─────────────────────────────────────────────────────────────────
|
|
|
|
|
|
async def _get_session(db: AsyncSession, session_id: uuid.UUID) -> VoteSession:
|
|
"""Fetch a vote session by ID with votes eagerly loaded, or raise 404."""
|
|
result = await db.execute(
|
|
select(VoteSession)
|
|
.options(selectinload(VoteSession.votes))
|
|
.where(VoteSession.id == session_id)
|
|
)
|
|
session = result.scalar_one_or_none()
|
|
if session is None:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session de vote introuvable")
|
|
return session
|
|
|
|
|
|
async def _get_protocol_with_formula(db: AsyncSession, protocol_id: uuid.UUID) -> VotingProtocol:
|
|
"""Fetch a voting protocol with its formula config, or raise 404."""
|
|
result = await db.execute(
|
|
select(VotingProtocol)
|
|
.options(selectinload(VotingProtocol.formula_config))
|
|
.where(VotingProtocol.id == protocol_id)
|
|
)
|
|
protocol = result.scalar_one_or_none()
|
|
if protocol is None:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Protocole de vote introuvable")
|
|
return protocol
|
|
|
|
|
|
def _compute_threshold(formula: FormulaConfig, wot_size: int, votes_total: int) -> float:
|
|
"""Compute the WoT-based threshold using the core formula.
|
|
|
|
Result = C + B^W + (M + (1-M) * (1 - (T/W)^G)) * max(0, T-C)
|
|
|
|
Where:
|
|
- C = constant_base
|
|
- B = base_exponent
|
|
- W = wot_size
|
|
- M = majority_pct / 100
|
|
- G = gradient_exponent
|
|
- T = votes_total (turnout)
|
|
"""
|
|
c = formula.constant_base
|
|
b = formula.base_exponent
|
|
w = max(wot_size, 1)
|
|
m = formula.majority_pct / 100.0
|
|
g = formula.gradient_exponent
|
|
t = votes_total
|
|
|
|
# Inertia-based threshold
|
|
base_power = b ** w if b > 0 else 0.0
|
|
turnout_ratio = min(t / w, 1.0) if w > 0 else 0.0
|
|
inertia = m + (1 - m) * (1 - turnout_ratio ** g)
|
|
threshold = c + base_power + inertia * max(0, t - c)
|
|
|
|
return threshold
|
|
|
|
|
|
def _compute_result(
|
|
session: VoteSession,
|
|
formula: FormulaConfig,
|
|
) -> dict:
|
|
"""Compute the vote result based on tallies and formula.
|
|
|
|
Returns a dict with threshold_required, result ("adopted" or "rejected"),
|
|
and whether Smith/TechComm criteria are met.
|
|
"""
|
|
threshold = _compute_threshold(formula, session.wot_size, session.votes_total)
|
|
|
|
# Main criterion: votes_for >= threshold
|
|
main_pass = session.votes_for >= threshold
|
|
|
|
# Smith criterion (if configured)
|
|
smith_pass = True
|
|
smith_threshold = None
|
|
if formula.smith_exponent is not None and session.smith_size > 0:
|
|
smith_threshold = math.ceil(session.smith_size ** formula.smith_exponent)
|
|
smith_pass = session.smith_votes_for >= smith_threshold
|
|
|
|
# TechComm criterion (if configured)
|
|
techcomm_pass = True
|
|
techcomm_threshold = None
|
|
if formula.techcomm_exponent is not None and session.techcomm_size > 0:
|
|
techcomm_threshold = math.ceil(session.techcomm_size ** formula.techcomm_exponent)
|
|
techcomm_pass = session.techcomm_votes_for >= techcomm_threshold
|
|
|
|
result = "adopted" if (main_pass and smith_pass and techcomm_pass) else "rejected"
|
|
|
|
return {
|
|
"threshold_required": threshold,
|
|
"result": result,
|
|
"smith_threshold": smith_threshold,
|
|
"smith_pass": smith_pass,
|
|
"techcomm_threshold": techcomm_threshold,
|
|
"techcomm_pass": techcomm_pass,
|
|
}
|
|
|
|
|
|
# ── Routes ──────────────────────────────────────────────────────────────────
|
|
|
|
|
|
@router.post("/sessions", response_model=VoteSessionOut, status_code=status.HTTP_201_CREATED)
|
|
async def create_vote_session(
|
|
payload: VoteSessionCreate,
|
|
db: AsyncSession = Depends(get_db),
|
|
identity: DuniterIdentity = Depends(get_current_identity),
|
|
) -> VoteSessionOut:
|
|
"""Create a new vote session.
|
|
|
|
The session duration is derived from the linked protocol's formula config.
|
|
WoT/Smith/TechComm sizes should be snapshotted from the blockchain at creation time.
|
|
"""
|
|
# Validate protocol exists and get formula for duration
|
|
protocol = await _get_protocol_with_formula(db, payload.voting_protocol_id)
|
|
formula = protocol.formula_config
|
|
|
|
starts_at = datetime.now(timezone.utc)
|
|
ends_at = starts_at + timedelta(days=formula.duration_days)
|
|
|
|
session = VoteSession(
|
|
decision_id=payload.decision_id,
|
|
item_version_id=payload.item_version_id,
|
|
voting_protocol_id=payload.voting_protocol_id,
|
|
starts_at=starts_at,
|
|
ends_at=ends_at,
|
|
# TODO: Snapshot actual WoT sizes from blockchain via Duniter RPC
|
|
wot_size=0,
|
|
smith_size=0,
|
|
techcomm_size=0,
|
|
)
|
|
db.add(session)
|
|
await db.commit()
|
|
await db.refresh(session)
|
|
|
|
return VoteSessionOut.model_validate(session)
|
|
|
|
|
|
@router.get("/sessions/{id}", response_model=VoteSessionOut)
|
|
async def get_vote_session(
|
|
id: uuid.UUID,
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> VoteSessionOut:
|
|
"""Get a vote session with current tallies."""
|
|
session = await _get_session(db, id)
|
|
return VoteSessionOut.model_validate(session)
|
|
|
|
|
|
@router.post("/sessions/{id}/vote", response_model=VoteOut, status_code=status.HTTP_201_CREATED)
|
|
async def submit_vote(
|
|
id: uuid.UUID,
|
|
payload: VoteCreate,
|
|
db: AsyncSession = Depends(get_db),
|
|
identity: DuniterIdentity = Depends(get_current_identity),
|
|
) -> VoteOut:
|
|
"""Submit a vote to a session.
|
|
|
|
Each identity can only vote once per session. Submitting again replaces the previous vote.
|
|
The vote must include a cryptographic signature for on-chain proof.
|
|
"""
|
|
session = await _get_session(db, id)
|
|
|
|
# Verify session is open
|
|
if session.status != "open":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Cette session de vote n'est pas ouverte",
|
|
)
|
|
|
|
# Verify session hasn't ended
|
|
if datetime.now(timezone.utc) > session.ends_at:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Cette session de vote est terminee",
|
|
)
|
|
|
|
# Check if voter already voted -- replace if so
|
|
existing_result = await db.execute(
|
|
select(Vote).where(
|
|
Vote.session_id == session.id,
|
|
Vote.voter_id == identity.id,
|
|
)
|
|
)
|
|
existing_vote = existing_result.scalar_one_or_none()
|
|
|
|
if existing_vote is not None:
|
|
# Deactivate old vote (keep for audit trail)
|
|
existing_vote.is_active = False
|
|
|
|
# Update tallies: subtract old vote
|
|
session.votes_total -= 1
|
|
if existing_vote.vote_value == "for":
|
|
session.votes_for -= 1
|
|
if existing_vote.voter_is_smith:
|
|
session.smith_votes_for -= 1
|
|
if existing_vote.voter_is_techcomm:
|
|
session.techcomm_votes_for -= 1
|
|
elif existing_vote.vote_value == "against":
|
|
session.votes_against -= 1
|
|
|
|
# Create new vote
|
|
vote = Vote(
|
|
session_id=session.id,
|
|
voter_id=identity.id,
|
|
vote_value=payload.vote_value,
|
|
nuanced_level=payload.nuanced_level,
|
|
comment=payload.comment,
|
|
signature=payload.signature,
|
|
signed_payload=payload.signed_payload,
|
|
voter_wot_status=identity.wot_status,
|
|
voter_is_smith=identity.is_smith,
|
|
voter_is_techcomm=identity.is_techcomm,
|
|
)
|
|
db.add(vote)
|
|
|
|
# Update tallies: add new vote
|
|
session.votes_total += 1
|
|
if payload.vote_value == "for":
|
|
session.votes_for += 1
|
|
if identity.is_smith:
|
|
session.smith_votes_for += 1
|
|
if identity.is_techcomm:
|
|
session.techcomm_votes_for += 1
|
|
elif payload.vote_value == "against":
|
|
session.votes_against += 1
|
|
|
|
await db.commit()
|
|
await db.refresh(vote)
|
|
|
|
return VoteOut.model_validate(vote)
|
|
|
|
|
|
@router.get("/sessions/{id}/votes", response_model=list[VoteOut])
|
|
async def list_votes(
|
|
id: uuid.UUID,
|
|
db: AsyncSession = Depends(get_db),
|
|
active_only: bool = Query(default=True, description="Ne montrer que les votes actifs"),
|
|
) -> list[VoteOut]:
|
|
"""List all votes in a session."""
|
|
# Verify session exists
|
|
await _get_session(db, id)
|
|
|
|
stmt = select(Vote).where(Vote.session_id == id)
|
|
if active_only:
|
|
stmt = stmt.where(Vote.is_active.is_(True))
|
|
|
|
stmt = stmt.order_by(Vote.created_at.asc())
|
|
result = await db.execute(stmt)
|
|
votes = result.scalars().all()
|
|
|
|
return [VoteOut.model_validate(v) for v in votes]
|
|
|
|
|
|
@router.get("/sessions/{id}/result")
|
|
async def get_vote_result(
|
|
id: uuid.UUID,
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> dict:
|
|
"""Compute and return the current result for a vote session.
|
|
|
|
Uses the WoT threshold formula linked through the voting protocol.
|
|
Returns current tallies, computed threshold, and whether the vote passes.
|
|
"""
|
|
session = await _get_session(db, id)
|
|
|
|
# Get the protocol and formula
|
|
protocol = await _get_protocol_with_formula(db, session.voting_protocol_id)
|
|
formula = protocol.formula_config
|
|
|
|
result_data = _compute_result(session, formula)
|
|
|
|
return {
|
|
"session_id": str(session.id),
|
|
"status": session.status,
|
|
"votes_for": session.votes_for,
|
|
"votes_against": session.votes_against,
|
|
"votes_total": session.votes_total,
|
|
"wot_size": session.wot_size,
|
|
"smith_size": session.smith_size,
|
|
"techcomm_size": session.techcomm_size,
|
|
"smith_votes_for": session.smith_votes_for,
|
|
"techcomm_votes_for": session.techcomm_votes_for,
|
|
**result_data,
|
|
}
|