Sprint 3 : protocoles de vote et boite a outils

Backend:
- Sessions de vote : list, close, tally, threshold details, auto-expiration
- Protocoles : update, simulate, meta-gouvernance, formulas CRUD
- Service vote enrichi : close_session, get_threshold_details, nuanced breakdown
- Schemas : ThresholdDetailOut, VoteResultOut, FormulaSimulationRequest/Result
- WebSocket broadcast sur chaque vote + fermeture session
- 25 nouveaux tests (threshold details, close, nuanced, simulation)

Frontend:
- 5 composants vote : VoteBinary, VoteNuanced, ThresholdGauge, FormulaDisplay, VoteHistory
- 3 composants protocoles : ProtocolPicker, FormulaEditor, ModeParamsDisplay
- Simulateur de formules interactif (page /protocols/formulas)
- Page detail protocole (/protocols/[id])
- Composable useWebSocket (live updates)
- Composable useVoteFormula (calcul client-side reactif)
- Integration KaTeX pour rendu LaTeX des formules

Documentation:
- API reference : 8 nouveaux endpoints documentes
- Formules : tables d'inertie, parametres detailles, simulation API
- Guide vote : vote binaire/nuance, jauge, historique, simulateur, meta-gouvernance

55 tests passes (+ 1 skipped), 126 fichiers total.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Yvv
2026-02-28 13:29:31 +01:00
parent 2bdc731639
commit cede2a585f
25 changed files with 3964 additions and 188 deletions

View File

@@ -1,4 +1,4 @@
"""Protocols router: voting protocols and formula configurations."""
"""Protocols router: voting protocols, formula configurations, and simulation."""
from __future__ import annotations
@@ -10,13 +10,20 @@ from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.database import get_db
from app.engine.smith_threshold import smith_threshold as compute_smith_threshold
from app.engine.techcomm_threshold import techcomm_threshold as compute_techcomm_threshold
from app.engine.threshold import wot_threshold as compute_wot_threshold
from app.models.protocol import FormulaConfig, VotingProtocol
from app.models.user import DuniterIdentity
from app.schemas.protocol import (
FormulaConfigCreate,
FormulaConfigOut,
FormulaConfigUpdate,
FormulaSimulationRequest,
FormulaSimulationResult,
VotingProtocolCreate,
VotingProtocolOut,
VotingProtocolUpdate,
)
from app.services.auth_service import get_current_identity
@@ -39,6 +46,17 @@ async def _get_protocol(db: AsyncSession, protocol_id: uuid.UUID) -> VotingProto
return protocol
async def _get_formula(db: AsyncSession, formula_id: uuid.UUID) -> FormulaConfig:
"""Fetch a formula config by ID, or raise 404."""
result = await db.execute(
select(FormulaConfig).where(FormulaConfig.id == formula_id)
)
formula = result.scalar_one_or_none()
if formula is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Configuration de formule introuvable")
return formula
# ── Voting Protocol routes ──────────────────────────────────────────────────
@@ -102,6 +120,30 @@ async def get_protocol(
return VotingProtocolOut.model_validate(protocol)
@router.put("/{id}", response_model=VotingProtocolOut)
async def update_protocol(
id: uuid.UUID,
payload: VotingProtocolUpdate,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> VotingProtocolOut:
"""Update a voting protocol (meta-governance).
Only provided fields will be updated. Requires authentication.
"""
protocol = await _get_protocol(db, id)
update_data = payload.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(protocol, field, value)
await db.commit()
# Reload with formula config
protocol = await _get_protocol(db, protocol.id)
return VotingProtocolOut.model_validate(protocol)
# ── Formula Config routes ───────────────────────────────────────────────────
@@ -137,3 +179,90 @@ async def create_formula(
await db.refresh(formula)
return FormulaConfigOut.model_validate(formula)
@router.get("/formulas/{id}", response_model=FormulaConfigOut)
async def get_formula(
id: uuid.UUID,
db: AsyncSession = Depends(get_db),
) -> FormulaConfigOut:
"""Get a single formula configuration."""
formula = await _get_formula(db, id)
return FormulaConfigOut.model_validate(formula)
@router.put("/formulas/{id}", response_model=FormulaConfigOut)
async def update_formula(
id: uuid.UUID,
payload: FormulaConfigUpdate,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> FormulaConfigOut:
"""Update a formula configuration (meta-governance).
Only provided fields will be updated. Requires authentication.
"""
formula = await _get_formula(db, id)
update_data = payload.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(formula, field, value)
await db.commit()
await db.refresh(formula)
return FormulaConfigOut.model_validate(formula)
# ── Simulation ──────────────────────────────────────────────────────────────
@router.post("/simulate", response_model=FormulaSimulationResult)
async def simulate_formula(
payload: FormulaSimulationRequest,
) -> FormulaSimulationResult:
"""Simulate a WoT threshold formula computation.
Pure calculation endpoint -- no database access needed.
Accepts WoT size, total votes, and formula parameters,
returns the computed thresholds and derived values.
"""
# Compute WoT threshold
wot_thresh = compute_wot_threshold(
wot_size=payload.wot_size,
total_votes=payload.total_votes,
majority_pct=payload.majority_pct,
base_exponent=payload.base_exponent,
gradient_exponent=payload.gradient_exponent,
constant_base=payload.constant_base,
)
# Compute derived values for transparency
w = payload.wot_size
t = payload.total_votes
m = payload.majority_pct / 100.0
g = payload.gradient_exponent
participation_rate = t / w if w > 0 else 0.0
turnout_ratio = min(t / w, 1.0) if w > 0 else 0.0
inertia_factor = 1.0 - turnout_ratio ** g if t > 0 else 1.0
required_ratio = m + (1.0 - m) * inertia_factor
# Smith threshold (optional)
smith_thresh = None
if payload.smith_wot_size is not None and payload.smith_exponent is not None:
smith_thresh = compute_smith_threshold(payload.smith_wot_size, payload.smith_exponent)
# TechComm threshold (optional)
techcomm_thresh = None
if payload.techcomm_size is not None and payload.techcomm_exponent is not None:
techcomm_thresh = compute_techcomm_threshold(payload.techcomm_size, payload.techcomm_exponent)
return FormulaSimulationResult(
wot_threshold=wot_thresh,
smith_threshold=smith_thresh,
techcomm_threshold=techcomm_thresh,
participation_rate=round(participation_rate, 6),
required_ratio=round(required_ratio, 6),
inertia_factor=round(inertia_factor, 6),
)

View File

@@ -1,4 +1,4 @@
"""Votes router: vote sessions, individual votes, and result computation."""
"""Votes router: vote sessions, individual votes, result computation, and live updates."""
from __future__ import annotations
@@ -15,8 +15,22 @@ from app.database import get_db
from app.models.protocol import FormulaConfig, VotingProtocol
from app.models.user import DuniterIdentity
from app.models.vote import Vote, VoteSession
from app.schemas.vote import VoteCreate, VoteOut, VoteSessionCreate, VoteSessionOut
from app.routers.websocket import manager
from app.schemas.vote import (
ThresholdDetailOut,
VoteCreate,
VoteOut,
VoteResultOut,
VoteSessionCreate,
VoteSessionListOut,
VoteSessionOut,
)
from app.services.auth_service import get_current_identity
from app.services.vote_service import (
close_session as svc_close_session,
compute_result as svc_compute_result,
get_threshold_details as svc_get_threshold_details,
)
router = APIRouter()
@@ -50,6 +64,37 @@ async def _get_protocol_with_formula(db: AsyncSession, protocol_id: uuid.UUID) -
return protocol
async def _check_session_expired(session: VoteSession, db: AsyncSession) -> VoteSession:
"""Check if a session has passed its ends_at deadline and auto-close it.
If the session is still marked 'open' but the deadline has passed,
close it and compute the final tally via the vote service.
Returns the (possibly updated) session.
"""
if session.status == "open" and datetime.now(timezone.utc) > session.ends_at:
try:
result = await svc_close_session(session.id, db)
# Reload session to get updated fields
db_result = await db.execute(
select(VoteSession)
.options(selectinload(VoteSession.votes))
.where(VoteSession.id == session.id)
)
session = db_result.scalar_one()
# Broadcast session closed event
await manager.broadcast(session.id, {
"type": "session_closed",
"session_id": str(session.id),
"result": result.get("result"),
"votes_for": result.get("votes_for", 0),
"votes_against": result.get("votes_against", 0),
"votes_total": result.get("votes_total", 0),
})
except ValueError:
pass # Session already closed by another process
return session
def _compute_threshold(formula: FormulaConfig, wot_size: int, votes_total: int) -> float:
"""Compute the WoT-based threshold using the core formula.
@@ -122,6 +167,35 @@ def _compute_result(
# ── Routes ──────────────────────────────────────────────────────────────────
@router.get("/sessions", response_model=list[VoteSessionListOut])
async def list_vote_sessions(
db: AsyncSession = Depends(get_db),
session_status: str | None = Query(default=None, alias="status", description="Filtrer par statut (open, closed, tallied)"),
decision_id: uuid.UUID | None = Query(default=None, description="Filtrer par decision_id"),
skip: int = Query(default=0, ge=0),
limit: int = Query(default=50, ge=1, le=200),
) -> list[VoteSessionListOut]:
"""List all vote sessions with optional filters by status and decision_id."""
stmt = select(VoteSession)
if session_status is not None:
stmt = stmt.where(VoteSession.status == session_status)
if decision_id is not None:
stmt = stmt.where(VoteSession.decision_id == decision_id)
stmt = stmt.order_by(VoteSession.created_at.desc()).offset(skip).limit(limit)
result = await db.execute(stmt)
sessions = result.scalars().all()
# Auto-close expired sessions before returning
checked_sessions = []
for s in sessions:
s = await _check_session_expired(s, db)
checked_sessions.append(s)
return [VoteSessionListOut.model_validate(s) for s in checked_sessions]
@router.post("/sessions", response_model=VoteSessionOut, status_code=status.HTTP_201_CREATED)
async def create_vote_session(
payload: VoteSessionCreate,
@@ -163,11 +237,52 @@ async def get_vote_session(
id: uuid.UUID,
db: AsyncSession = Depends(get_db),
) -> VoteSessionOut:
"""Get a vote session with current tallies."""
"""Get a vote session with current tallies.
Automatically closes the session if its deadline has passed.
"""
session = await _get_session(db, id)
session = await _check_session_expired(session, db)
return VoteSessionOut.model_validate(session)
@router.post("/sessions/{id}/close", response_model=VoteResultOut)
async def close_vote_session(
id: uuid.UUID,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> VoteResultOut:
"""Manually close a vote session and compute the final result.
Requires authentication. The session must be in 'open' status.
"""
try:
result = await svc_close_session(id, db)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc))
# Broadcast session closed event via WebSocket
await manager.broadcast(id, {
"type": "session_closed",
"session_id": str(id),
"result": result.get("result"),
"votes_for": result.get("votes_for", 0),
"votes_against": result.get("votes_against", 0),
"votes_total": result.get("votes_total", 0),
})
return VoteResultOut(
session_id=id,
result=result.get("result"),
threshold_required=float(result.get("threshold", 0)),
votes_for=result.get("votes_for", 0),
votes_against=result.get("votes_against", 0),
votes_total=result.get("votes_total", 0),
adopted=result.get("adopted", False),
nuanced_breakdown=result.get("nuanced_breakdown"),
)
@router.post("/sessions/{id}/vote", response_model=VoteOut, status_code=status.HTTP_201_CREATED)
async def submit_vote(
id: uuid.UUID,
@@ -179,9 +294,13 @@ async def submit_vote(
Each identity can only vote once per session. Submitting again replaces the previous vote.
The vote must include a cryptographic signature for on-chain proof.
After submission, broadcasts a vote_update event via WebSocket.
"""
session = await _get_session(db, id)
# Auto-close check
session = await _check_session_expired(session, db)
# Verify session is open
if session.status != "open":
raise HTTPException(
@@ -249,6 +368,15 @@ async def submit_vote(
await db.commit()
await db.refresh(vote)
# Broadcast vote update via WebSocket
await manager.broadcast(session.id, {
"type": "vote_update",
"session_id": str(session.id),
"votes_for": session.votes_for,
"votes_against": session.votes_against,
"votes_total": session.votes_total,
})
return VoteOut.model_validate(vote)
@@ -273,6 +401,28 @@ async def list_votes(
return [VoteOut.model_validate(v) for v in votes]
@router.get("/sessions/{id}/threshold", response_model=ThresholdDetailOut)
async def get_threshold_details(
id: uuid.UUID,
db: AsyncSession = Depends(get_db),
) -> ThresholdDetailOut:
"""Return computed threshold details for a vote session.
Includes WoT/Smith/TechComm thresholds, pass/fail status,
participation rate, and the formula parameters used.
Automatically closes the session if its deadline has passed.
"""
session = await _get_session(db, id)
session = await _check_session_expired(session, db)
try:
details = await svc_get_threshold_details(id, db)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc))
return ThresholdDetailOut(**details)
@router.get("/sessions/{id}/result")
async def get_vote_result(
id: uuid.UUID,
@@ -282,8 +432,10 @@ async def get_vote_result(
Uses the WoT threshold formula linked through the voting protocol.
Returns current tallies, computed threshold, and whether the vote passes.
Automatically closes the session if its deadline has passed.
"""
session = await _get_session(db, id)
session = await _check_session_expired(session, db)
# Get the protocol and formula
protocol = await _get_protocol_with_formula(db, session.voting_protocol_id)
@@ -304,3 +456,41 @@ async def get_vote_result(
"techcomm_votes_for": session.techcomm_votes_for,
**result_data,
}
@router.post("/sessions/{id}/tally", response_model=VoteResultOut)
async def force_tally(
id: uuid.UUID,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> VoteResultOut:
"""Force a recount of a vote session.
Requires authentication. Useful after a chain snapshot update
or when recalculation is needed. Works on any session status.
"""
try:
result = await svc_compute_result(id, db)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc))
# Broadcast tally update via WebSocket
await manager.broadcast(id, {
"type": "tally_update",
"session_id": str(id),
"result": result.get("result"),
"votes_for": result.get("votes_for", 0),
"votes_against": result.get("votes_against", 0),
"votes_total": result.get("votes_total", 0),
})
return VoteResultOut(
session_id=id,
result=result.get("result"),
threshold_required=float(result.get("threshold", 0)),
votes_for=result.get("votes_for", 0),
votes_against=result.get("votes_against", 0),
votes_total=result.get("votes_total", 0),
adopted=result.get("adopted", False),
nuanced_breakdown=result.get("nuanced_breakdown"),
)

View File

@@ -33,6 +33,25 @@ class FormulaConfigCreate(BaseModel):
nuanced_threshold_pct: int | None = Field(default=None, ge=0, le=100, description="Threshold percentage for nuanced vote")
class FormulaConfigUpdate(BaseModel):
"""Partial update payload for a formula configuration (all fields optional)."""
name: str | None = Field(default=None, min_length=1, max_length=128)
description: str | None = None
duration_days: int | None = Field(default=None, ge=1)
majority_pct: int | None = Field(default=None, ge=1, le=100)
base_exponent: float | None = Field(default=None, ge=0.0, le=1.0)
gradient_exponent: float | None = Field(default=None, ge=0.0, le=2.0)
constant_base: float | None = Field(default=None, ge=0.0, le=1.0)
smith_exponent: float | None = Field(default=None, ge=0.0, le=1.0)
techcomm_exponent: float | None = Field(default=None, ge=0.0, le=1.0)
nuanced_min_participants: int | None = Field(default=None, ge=0)
nuanced_threshold_pct: int | None = Field(default=None, ge=0, le=100)
class FormulaConfigOut(BaseModel):
"""Full formula configuration representation."""
@@ -67,6 +86,16 @@ class VotingProtocolCreate(BaseModel):
is_meta_governed: bool = Field(default=False, description="Whether this protocol is itself governed by meta-vote")
class VotingProtocolUpdate(BaseModel):
"""Partial update payload for a voting protocol (meta-governance)."""
name: str | None = Field(default=None, min_length=1, max_length=128)
description: str | None = None
vote_type: str | None = Field(default=None, max_length=32)
mode_params: str | None = Field(default=None, max_length=64)
is_meta_governed: bool | None = None
class VotingProtocolOut(BaseModel):
"""Full voting protocol representation including formula config."""
@@ -81,3 +110,32 @@ class VotingProtocolOut(BaseModel):
is_meta_governed: bool
created_at: datetime
formula_config: FormulaConfigOut
# ── Formula Simulation ───────────────────────────────────────────
class FormulaSimulationRequest(BaseModel):
"""Request payload for simulating a WoT threshold formula computation."""
wot_size: int = Field(..., ge=1, description="Size of the WoT (eligible voter corpus)")
total_votes: int = Field(..., ge=0, description="Number of votes cast (for + against)")
majority_pct: int = Field(default=50, ge=1, le=100, description="Majority percentage M")
base_exponent: float = Field(default=0.1, ge=0.0, le=1.0, description="Base exponent B")
gradient_exponent: float = Field(default=0.2, ge=0.0, le=2.0, description="Gradient exponent G")
constant_base: float = Field(default=0.0, ge=0.0, le=1.0, description="Constant base C")
smith_wot_size: int | None = Field(default=None, ge=1, description="Smith sub-WoT size")
smith_exponent: float | None = Field(default=None, ge=0.0, le=1.0, description="Smith exponent S")
techcomm_size: int | None = Field(default=None, ge=1, description="TechComm size")
techcomm_exponent: float | None = Field(default=None, ge=0.0, le=1.0, description="TechComm exponent T")
class FormulaSimulationResult(BaseModel):
"""Result of a formula simulation."""
wot_threshold: int
smith_threshold: int | None = None
techcomm_threshold: int | None = None
participation_rate: float
required_ratio: float
inertia_factor: float

View File

@@ -55,6 +55,74 @@ class VoteSessionOut(BaseModel):
created_at: datetime
class VoteSessionListOut(BaseModel):
"""Lighter vote session representation for list endpoints (no nested votes)."""
model_config = ConfigDict(from_attributes=True)
id: UUID
decision_id: UUID | None = None
item_version_id: UUID | None = None
voting_protocol_id: UUID
# Snapshot at session start
wot_size: int
smith_size: int
techcomm_size: int
# Dates
starts_at: datetime
ends_at: datetime
# Status
status: str
# Tallies
votes_for: int
votes_against: int
votes_total: int
threshold_required: float
result: str | None = None
created_at: datetime
# ── Threshold Details ────────────────────────────────────────────
class ThresholdDetailOut(BaseModel):
"""Detailed threshold computation result for a vote session."""
wot_threshold: int
smith_threshold: int | None = None
techcomm_threshold: int | None = None
votes_for: int
votes_against: int
votes_total: int
wot_passed: bool
smith_passed: bool | None = None
techcomm_passed: bool | None = None
overall_passed: bool
participation_rate: float
formula_params: dict
# ── Vote Result ──────────────────────────────────────────────────
class VoteResultOut(BaseModel):
"""Structured vote result response."""
session_id: UUID
result: str | None = None # adopted, rejected
threshold_required: float
votes_for: int
votes_against: int
votes_total: int
adopted: bool
nuanced_breakdown: dict | None = None # for nuanced votes
# ── Vote ─────────────────────────────────────────────────────────

View File

@@ -1,8 +1,9 @@
"""Vote service: compute results and verify vote signatures."""
"""Vote service: compute results, close sessions, threshold details, and signature verification."""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
@@ -17,6 +18,52 @@ from app.models.protocol import FormulaConfig, VotingProtocol
from app.models.vote import Vote, VoteSession
# ── Helpers ──────────────────────────────────────────────────────
async def _load_session_with_votes(db: AsyncSession, session_id: uuid.UUID) -> VoteSession:
"""Load a vote session with its votes eagerly, raising ValueError if not found."""
result = await db.execute(
select(VoteSession)
.options(selectinload(VoteSession.votes))
.where(VoteSession.id == session_id)
)
session = result.scalar_one_or_none()
if session is None:
raise ValueError(f"Session de vote introuvable : {session_id}")
return session
async def _load_protocol_with_formula(db: AsyncSession, protocol_id: uuid.UUID) -> VotingProtocol:
"""Load a voting protocol with its formula config, raising ValueError if not found."""
proto_result = await db.execute(
select(VotingProtocol)
.options(selectinload(VotingProtocol.formula_config))
.where(VotingProtocol.id == protocol_id)
)
protocol = proto_result.scalar_one_or_none()
if protocol is None:
raise ValueError(f"Protocole de vote introuvable : {protocol_id}")
return protocol
def _extract_params(protocol: VotingProtocol, formula: FormulaConfig) -> dict:
"""Extract formula parameters from mode_params or formula_config."""
if protocol.mode_params:
return parse_mode_params(protocol.mode_params)
return {
"majority_pct": formula.majority_pct,
"base_exponent": formula.base_exponent,
"gradient_exponent": formula.gradient_exponent,
"constant_base": formula.constant_base,
"smith_exponent": formula.smith_exponent,
"techcomm_exponent": formula.techcomm_exponent,
}
# ── Compute Result ───────────────────────────────────────────────
async def compute_result(session_id: uuid.UUID, db: AsyncSession) -> dict:
"""Load a vote session, its protocol and formula, compute thresholds, and tally.
@@ -33,40 +80,10 @@ async def compute_result(session_id: uuid.UUID, db: AsyncSession) -> dict:
Result dict with keys: threshold, votes_for, votes_against,
votes_total, adopted, smith_ok, techcomm_ok, details.
"""
# Load session with votes eagerly
result = await db.execute(
select(VoteSession)
.options(selectinload(VoteSession.votes))
.where(VoteSession.id == session_id)
)
session = result.scalar_one_or_none()
if session is None:
raise ValueError(f"Session de vote introuvable : {session_id}")
# Load protocol + formula config
proto_result = await db.execute(
select(VotingProtocol)
.options(selectinload(VotingProtocol.formula_config))
.where(VotingProtocol.id == session.voting_protocol_id)
)
protocol = proto_result.scalar_one_or_none()
if protocol is None:
raise ValueError(f"Protocole de vote introuvable pour la session {session_id}")
session = await _load_session_with_votes(db, session_id)
protocol = await _load_protocol_with_formula(db, session.voting_protocol_id)
formula: FormulaConfig = protocol.formula_config
# If mode_params is set on the protocol, it overrides formula_config values
if protocol.mode_params:
params = parse_mode_params(protocol.mode_params)
else:
params = {
"majority_pct": formula.majority_pct,
"base_exponent": formula.base_exponent,
"gradient_exponent": formula.gradient_exponent,
"constant_base": formula.constant_base,
"smith_exponent": formula.smith_exponent,
"techcomm_exponent": formula.techcomm_exponent,
}
params = _extract_params(protocol, formula)
# Separate vote types
active_votes: list[Vote] = [v for v in session.votes if v.is_active]
@@ -138,7 +155,7 @@ async def _compute_nuanced(
params: dict,
db: AsyncSession,
) -> dict:
"""Compute a nuanced vote result."""
"""Compute a nuanced vote result with full per-level breakdown."""
vote_levels = [v.nuanced_level for v in active_votes if v.nuanced_level is not None]
threshold_pct = formula.nuanced_threshold_pct or 80
@@ -163,10 +180,157 @@ async def _compute_nuanced(
return {
"vote_type": "nuanced",
"result": vote_result,
"nuanced_breakdown": {
"per_level_counts": evaluation["per_level_counts"],
"positive_count": evaluation["positive_count"],
"positive_pct": evaluation["positive_pct"],
"threshold_met": evaluation["threshold_met"],
"min_participants_met": evaluation["min_participants_met"],
"threshold_pct": threshold_pct,
"min_participants": min_participants,
},
**evaluation,
}
# ── Close Session ────────────────────────────────────────────────
async def close_session(session_id: uuid.UUID, db: AsyncSession) -> dict:
"""Close a vote session and compute its final result.
Parameters
----------
session_id:
UUID of the VoteSession to close.
db:
Async database session.
Returns
-------
dict
Result dict from compute_result, augmented with close metadata.
Raises
------
ValueError
If the session is not found or already closed/tallied.
"""
session = await _load_session_with_votes(db, session_id)
if session.status not in ("open",):
raise ValueError(
f"Impossible de fermer la session {session_id} : statut actuel '{session.status}'"
)
# Mark as closed before tallying
session.status = "closed"
await db.commit()
# Compute the final result (this will set status to "tallied")
result = await compute_result(session_id, db)
return {
"closed_at": datetime.now(timezone.utc).isoformat(),
**result,
}
# ── Threshold Details ────────────────────────────────────────────
async def get_threshold_details(session_id: uuid.UUID, db: AsyncSession) -> dict:
"""Return detailed threshold computation for a vote session.
Parameters
----------
session_id:
UUID of the VoteSession.
db:
Async database session.
Returns
-------
dict
Detailed breakdown including wot/smith/techcomm thresholds,
pass/fail booleans, participation rate, and formula params.
"""
session = await _load_session_with_votes(db, session_id)
protocol = await _load_protocol_with_formula(db, session.voting_protocol_id)
formula: FormulaConfig = protocol.formula_config
params = _extract_params(protocol, formula)
active_votes: list[Vote] = [v for v in session.votes if v.is_active]
votes_for = sum(1 for v in active_votes if v.vote_value == "for")
votes_against = sum(1 for v in active_votes if v.vote_value == "against")
total = votes_for + votes_against
# WoT threshold
wot_thresh = wot_threshold(
wot_size=session.wot_size,
total_votes=total,
majority_pct=params.get("majority_pct", 50),
base_exponent=params.get("base_exponent", 0.1),
gradient_exponent=params.get("gradient_exponent", 0.2),
constant_base=params.get("constant_base", 0.0),
)
wot_passed = votes_for >= wot_thresh
# Smith criterion (optional)
smith_thresh = None
smith_passed = None
if params.get("smith_exponent") is not None and session.smith_size > 0:
smith_thresh = smith_threshold(session.smith_size, params["smith_exponent"])
smith_votes = sum(1 for v in active_votes if v.voter_is_smith and v.vote_value == "for")
smith_passed = smith_votes >= smith_thresh
# TechComm criterion (optional)
techcomm_thresh = None
techcomm_passed = None
if params.get("techcomm_exponent") is not None and session.techcomm_size > 0:
techcomm_thresh = techcomm_threshold(session.techcomm_size, params["techcomm_exponent"])
techcomm_votes = sum(1 for v in active_votes if v.voter_is_techcomm and v.vote_value == "for")
techcomm_passed = techcomm_votes >= techcomm_thresh
# Overall pass: all applicable criteria must pass
overall_passed = wot_passed
if smith_passed is not None:
overall_passed = overall_passed and smith_passed
if techcomm_passed is not None:
overall_passed = overall_passed and techcomm_passed
# Participation rate
participation_rate = (total / session.wot_size) if session.wot_size > 0 else 0.0
# Formula params used
formula_params = {
"M": params.get("majority_pct", 50),
"B": params.get("base_exponent", 0.1),
"G": params.get("gradient_exponent", 0.2),
"C": params.get("constant_base", 0.0),
"S": params.get("smith_exponent"),
"T": params.get("techcomm_exponent"),
}
return {
"wot_threshold": wot_thresh,
"smith_threshold": smith_thresh,
"techcomm_threshold": techcomm_thresh,
"votes_for": votes_for,
"votes_against": votes_against,
"votes_total": total,
"wot_passed": wot_passed,
"smith_passed": smith_passed,
"techcomm_passed": techcomm_passed,
"overall_passed": overall_passed,
"participation_rate": round(participation_rate, 6),
"formula_params": formula_params,
}
# ── Signature Verification ───────────────────────────────────────
async def verify_vote_signature(address: str, signature: str, payload: str) -> bool:
"""Verify an Ed25519 signature from a Duniter V2 address.

View File

@@ -0,0 +1,478 @@
"""Tests for vote service engine functions: threshold details, session logic, nuanced evaluation, and simulation.
All tests are pure unit tests that exercise the engine functions directly
without any database dependency.
Real-world reference case:
Vote Engagement Forgeron v2.0.0 (Feb 2026)
wot_size=7224, votes_for=97, votes_against=23, total=120
params M=50, B=0.1, G=0.2 => threshold=94 => adopted (97 >= 94)
"""
from __future__ import annotations
import math
import pytest
from app.engine.nuanced_vote import evaluate_nuanced
from app.engine.smith_threshold import smith_threshold
from app.engine.techcomm_threshold import techcomm_threshold
from app.engine.threshold import wot_threshold
# ---------------------------------------------------------------------------
# Threshold details computation: real Forgeron case (97/23/7224)
# ---------------------------------------------------------------------------
class TestThresholdDetailsForgeron:
"""Simulate the threshold details computation that the service would return
for the real Engagement Forgeron v2.0.0 vote."""
WOT_SIZE = 7224
VOTES_FOR = 97
VOTES_AGAINST = 23
TOTAL = 120
SMITH_SIZE = 20
SMITH_EXPONENT = 0.1
TECHCOMM_SIZE = 5
TECHCOMM_EXPONENT = 0.1
def _compute_details(self) -> dict:
"""Reproduce the threshold details logic from vote_service.get_threshold_details."""
wot_thresh = wot_threshold(
wot_size=self.WOT_SIZE,
total_votes=self.TOTAL,
majority_pct=50,
base_exponent=0.1,
gradient_exponent=0.2,
constant_base=0.0,
)
wot_passed = self.VOTES_FOR >= wot_thresh
smith_thresh = smith_threshold(self.SMITH_SIZE, self.SMITH_EXPONENT)
# Assume all smith members voted for
smith_votes_for = 5
smith_passed = smith_votes_for >= smith_thresh
techcomm_thresh = techcomm_threshold(self.TECHCOMM_SIZE, self.TECHCOMM_EXPONENT)
# Assume 2 techcomm members voted for
techcomm_votes_for = 2
techcomm_passed = techcomm_votes_for >= techcomm_thresh
overall_passed = wot_passed and smith_passed and techcomm_passed
participation_rate = self.TOTAL / self.WOT_SIZE
return {
"wot_threshold": wot_thresh,
"smith_threshold": smith_thresh,
"techcomm_threshold": techcomm_thresh,
"votes_for": self.VOTES_FOR,
"votes_against": self.VOTES_AGAINST,
"votes_total": self.TOTAL,
"wot_passed": wot_passed,
"smith_passed": smith_passed,
"techcomm_passed": techcomm_passed,
"overall_passed": overall_passed,
"participation_rate": round(participation_rate, 6),
"formula_params": {
"M": 50, "B": 0.1, "G": 0.2, "C": 0.0,
"S": self.SMITH_EXPONENT, "T": self.TECHCOMM_EXPONENT,
},
}
def test_wot_threshold_value(self):
"""WoT threshold for Forgeron vote should be in the expected range."""
details = self._compute_details()
assert 80 <= details["wot_threshold"] <= 120
# 97 votes_for must pass
assert details["wot_passed"] is True
def test_smith_threshold_value(self):
"""Smith threshold: ceil(20^0.1) = ceil(1.35) = 2."""
details = self._compute_details()
assert details["smith_threshold"] == 2
assert details["smith_passed"] is True
def test_techcomm_threshold_value(self):
"""TechComm threshold: ceil(5^0.1) = ceil(1.175) = 2."""
details = self._compute_details()
assert details["techcomm_threshold"] == 2
assert details["techcomm_passed"] is True
def test_overall_pass(self):
"""All three criteria pass => overall adopted."""
details = self._compute_details()
assert details["overall_passed"] is True
def test_participation_rate(self):
"""Participation rate for 120/7224 ~ 1.66%."""
details = self._compute_details()
expected = round(120 / 7224, 6)
assert details["participation_rate"] == expected
assert details["participation_rate"] < 0.02 # less than 2%
def test_formula_params_present(self):
"""All formula params must be present and correct."""
details = self._compute_details()
params = details["formula_params"]
assert params["M"] == 50
assert params["B"] == 0.1
assert params["G"] == 0.2
assert params["C"] == 0.0
assert params["S"] == 0.1
assert params["T"] == 0.1
# ---------------------------------------------------------------------------
# Close session behavior (engine-level logic)
# ---------------------------------------------------------------------------
class TestCloseSessionLogic:
"""Test the logic that would execute when a session is closed:
computing the final tally and determining adopted/rejected."""
def test_binary_vote_adopted(self):
"""97 for / 23 against out of 7224 WoT => adopted."""
threshold = wot_threshold(
wot_size=7224,
total_votes=120,
majority_pct=50,
base_exponent=0.1,
gradient_exponent=0.2,
)
adopted = 97 >= threshold
assert adopted is True
result = "adopted" if adopted else "rejected"
assert result == "adopted"
def test_binary_vote_rejected(self):
"""50 for / 70 against out of 7224 WoT => rejected."""
threshold = wot_threshold(
wot_size=7224,
total_votes=120,
majority_pct=50,
base_exponent=0.1,
gradient_exponent=0.2,
)
adopted = 50 >= threshold
assert adopted is False
result = "adopted" if adopted else "rejected"
assert result == "rejected"
def test_close_with_zero_votes(self):
"""Session with 0 votes => threshold is ~0 (B^W), effectively rejected
because 0 votes_for cannot meet even a tiny threshold."""
threshold = wot_threshold(
wot_size=7224,
total_votes=0,
majority_pct=50,
base_exponent=0.1,
gradient_exponent=0.2,
)
# B^W = 0.1^7224 -> effectively 0, ceil(0) = 0
# But with 0 votes_for, 0 >= 0 is True
# This is a degenerate case; in practice sessions with 0 votes
# would be marked invalid
assert threshold == 0 or threshold == math.ceil(0.1 ** 7224)
def test_close_high_participation(self):
"""3000/7224 participating, 2500 for / 500 against => should pass.
At ~41.5% participation with G=0.2, the inertia factor is low,
so a strong supermajority should pass.
"""
threshold = wot_threshold(
wot_size=7224,
total_votes=3000,
majority_pct=50,
base_exponent=0.1,
gradient_exponent=0.2,
)
# At ~41.5% participation, threshold is lower than near-unanimity
# but still above simple majority (1500)
assert threshold > 1500, f"Threshold {threshold} should be above simple majority"
assert threshold < 3000, f"Threshold {threshold} should be below total votes"
adopted = 2500 >= threshold
assert adopted is True
def test_close_barely_fails(self):
"""Use exact threshold to verify borderline rejection."""
threshold = wot_threshold(
wot_size=7224,
total_votes=120,
majority_pct=50,
base_exponent=0.1,
gradient_exponent=0.2,
)
# votes_for = threshold - 1 => should fail
barely_fail = (threshold - 1) >= threshold
assert barely_fail is False
def test_close_smith_criterion_blocks(self):
"""Even if WoT passes, failing Smith criterion blocks adoption."""
threshold = wot_threshold(
wot_size=7224,
total_votes=120,
majority_pct=50,
base_exponent=0.1,
gradient_exponent=0.2,
)
wot_pass = 97 >= threshold
smith_required = smith_threshold(20, 0.1) # ceil(20^0.1) = 2
smith_ok = 1 >= smith_required # Only 1 smith voted -> fails
adopted = wot_pass and smith_ok
assert wot_pass is True
assert smith_ok is False
assert adopted is False
# ---------------------------------------------------------------------------
# Nuanced vote result evaluation
# ---------------------------------------------------------------------------
class TestNuancedVoteResult:
"""Test nuanced vote evaluation with per-level breakdown,
as would be returned by compute_result for nuanced votes."""
def test_nuanced_adopted_with_breakdown(self):
"""Standard nuanced vote that passes threshold and min_participants."""
votes = [5] * 30 + [4] * 20 + [3] * 15 + [2] * 5 + [1] * 3 + [0] * 2
result = evaluate_nuanced(votes, threshold_pct=80, min_participants=59)
# Total = 75, positive = 30+20+15 = 65
assert result["total"] == 75
assert result["positive_count"] == 65
assert result["positive_pct"] == pytest.approx(86.67, abs=0.1)
assert result["adopted"] is True
# Verify per-level breakdown
assert result["per_level_counts"][5] == 30
assert result["per_level_counts"][4] == 20
assert result["per_level_counts"][3] == 15
assert result["per_level_counts"][2] == 5
assert result["per_level_counts"][1] == 3
assert result["per_level_counts"][0] == 2
def test_nuanced_rejected_threshold_not_met(self):
"""Nuanced vote where positive percentage is below threshold."""
votes = [5] * 10 + [4] * 10 + [3] * 10 + [2] * 15 + [1] * 15 + [0] * 10
result = evaluate_nuanced(votes, threshold_pct=80, min_participants=59)
# Total = 70, positive = 10+10+10 = 30
assert result["total"] == 70
assert result["positive_count"] == 30
assert result["positive_pct"] == pytest.approx(42.86, abs=0.1)
assert result["threshold_met"] is False
assert result["min_participants_met"] is True
assert result["adopted"] is False
def test_nuanced_all_neutre(self):
"""All voters at level 3 (NEUTRE) still counts as positive."""
votes = [3] * 60
result = evaluate_nuanced(votes, threshold_pct=80, min_participants=59)
assert result["total"] == 60
assert result["positive_count"] == 60
assert result["positive_pct"] == 100.0
assert result["adopted"] is True
def test_nuanced_mixed_heavy_negative(self):
"""Majority at levels 0-2 => rejected."""
votes = [0] * 20 + [1] * 20 + [2] * 15 + [3] * 2 + [4] * 1 + [5] * 1
result = evaluate_nuanced(votes, threshold_pct=80, min_participants=59)
# Total = 59, positive = 2+1+1 = 4
assert result["total"] == 59
assert result["positive_count"] == 4
assert result["positive_pct"] < 10
assert result["adopted"] is False
def test_nuanced_breakdown_dict_structure(self):
"""Verify the breakdown structure matches what the service builds."""
votes = [5] * 20 + [4] * 20 + [3] * 19 + [2] * 5 + [1] * 3 + [0] * 2
evaluation = evaluate_nuanced(votes, threshold_pct=80, min_participants=59)
# Simulate the nuanced_breakdown dict the service builds
nuanced_breakdown = {
"per_level_counts": evaluation["per_level_counts"],
"positive_count": evaluation["positive_count"],
"positive_pct": evaluation["positive_pct"],
"threshold_met": evaluation["threshold_met"],
"min_participants_met": evaluation["min_participants_met"],
"threshold_pct": 80,
"min_participants": 59,
}
assert "per_level_counts" in nuanced_breakdown
assert nuanced_breakdown["threshold_pct"] == 80
assert nuanced_breakdown["min_participants"] == 59
assert nuanced_breakdown["positive_count"] == 59
assert nuanced_breakdown["threshold_met"] is True
# ---------------------------------------------------------------------------
# Simulation endpoint logic (pure engine functions)
# ---------------------------------------------------------------------------
class TestSimulationLogic:
"""Test the formula simulation that the /simulate endpoint performs,
exercising all three threshold engine functions together."""
def test_simulation_forgeron_case(self):
"""Simulate the Forgeron vote: wot=7224, total=120, M50 B.1 G.2."""
wot_thresh = wot_threshold(
wot_size=7224,
total_votes=120,
majority_pct=50,
base_exponent=0.1,
gradient_exponent=0.2,
)
# Derived values (matching the simulate endpoint logic)
w = 7224
t = 120
m = 0.5
g = 0.2
participation_rate = t / w
turnout_ratio = min(t / w, 1.0)
inertia_factor = 1.0 - turnout_ratio ** g
required_ratio = m + (1.0 - m) * inertia_factor
assert 80 <= wot_thresh <= 120
assert participation_rate == pytest.approx(0.016611, abs=0.001)
assert 0 < inertia_factor < 1
assert required_ratio > m # Inertia pushes above simple majority
def test_simulation_full_participation(self):
"""At 100% participation, threshold approaches simple majority."""
wot_size = 100
total = 100
wot_thresh = wot_threshold(
wot_size=wot_size,
total_votes=total,
majority_pct=50,
base_exponent=0.1,
gradient_exponent=0.2,
)
# At full participation, turnout_ratio = 1.0
# inertia_factor = 1.0 - 1.0^0.2 = 0
# required_ratio = 0.5 + 0.5*0 = 0.5
# threshold = 0 + 0.1^100 + 0.5 * 100 = ~50
assert wot_thresh == math.ceil(0.1 ** 100 + 0.5 * 100)
assert wot_thresh == 50
def test_simulation_with_smith_and_techcomm(self):
"""Simulate with all three criteria."""
wot_thresh = wot_threshold(
wot_size=7224,
total_votes=120,
majority_pct=50,
base_exponent=0.1,
gradient_exponent=0.2,
)
smith_thresh = smith_threshold(smith_wot_size=20, exponent=0.1)
techcomm_thresh = techcomm_threshold(cotec_size=5, exponent=0.1)
assert wot_thresh > 0
assert smith_thresh == 2 # ceil(20^0.1) = ceil(1.35) = 2
assert techcomm_thresh == 2 # ceil(5^0.1) = ceil(1.175) = 2
def test_simulation_varying_majority(self):
"""Higher majority_pct increases the threshold at full participation."""
thresh_50 = wot_threshold(wot_size=100, total_votes=100, majority_pct=50)
thresh_66 = wot_threshold(wot_size=100, total_votes=100, majority_pct=66)
thresh_80 = wot_threshold(wot_size=100, total_votes=100, majority_pct=80)
assert thresh_50 < thresh_66 < thresh_80
def test_simulation_varying_gradient(self):
"""Higher gradient exponent means more inertia at partial participation.
With T/W < 1, a higher G makes (T/W)^G smaller, so
inertia_factor = 1 - (T/W)^G becomes larger, raising the threshold.
At 50% participation: (0.5)^0.2 ~ 0.87, (0.5)^1.0 = 0.5.
"""
thresh_g02 = wot_threshold(
wot_size=200, total_votes=100,
majority_pct=50, gradient_exponent=0.2,
)
thresh_g10 = wot_threshold(
wot_size=200, total_votes=100,
majority_pct=50, gradient_exponent=1.0,
)
# Higher G = more inertia at partial participation = higher threshold
assert thresh_g10 > thresh_g02
def test_simulation_zero_votes(self):
"""Zero votes produces a minimal threshold."""
thresh = wot_threshold(
wot_size=7224,
total_votes=0,
majority_pct=50,
base_exponent=0.1,
gradient_exponent=0.2,
)
# B^W = 0.1^7224 ~ 0, ceil(0) = 0
assert thresh == 0
def test_simulation_small_wot(self):
"""Small WoT size (e.g. 5 members)."""
thresh = wot_threshold(
wot_size=5,
total_votes=3,
majority_pct=50,
base_exponent=0.1,
gradient_exponent=0.2,
)
# With 3/5 participation, should require more than simple majority of 3
assert thresh >= 2
assert thresh <= 3
def test_simulation_result_structure(self):
"""Verify the simulation produces all expected values for the API response."""
w = 7224
t = 120
m_pct = 50
m = m_pct / 100.0
b = 0.1
g = 0.2
c = 0.0
wot_thresh = wot_threshold(
wot_size=w, total_votes=t,
majority_pct=m_pct, base_exponent=b,
gradient_exponent=g, constant_base=c,
)
smith_thresh = smith_threshold(smith_wot_size=20, exponent=0.1)
techcomm_thresh = techcomm_threshold(cotec_size=5, exponent=0.1)
participation_rate = t / w
turnout_ratio = min(t / w, 1.0)
inertia_factor = 1.0 - turnout_ratio ** g
required_ratio = m + (1.0 - m) * inertia_factor
# Build the simulation result dict (matching FormulaSimulationResult schema)
result = {
"wot_threshold": wot_thresh,
"smith_threshold": smith_thresh,
"techcomm_threshold": techcomm_thresh,
"participation_rate": round(participation_rate, 6),
"required_ratio": round(required_ratio, 6),
"inertia_factor": round(inertia_factor, 6),
}
assert isinstance(result["wot_threshold"], int)
assert isinstance(result["smith_threshold"], int)
assert isinstance(result["techcomm_threshold"], int)
assert 0 < result["participation_rate"] < 1
assert 0 < result["required_ratio"] <= 1
assert 0 < result["inertia_factor"] < 1