"""Vote service: compute results, close sessions, threshold details, and signature verification.""" from __future__ import annotations import uuid from datetime import datetime, timezone from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.engine.mode_params import parse_mode_params from app.engine.nuanced_vote import evaluate_nuanced from app.engine.smith_threshold import smith_threshold from app.engine.techcomm_threshold import techcomm_threshold from app.engine.threshold import wot_threshold from app.models.protocol import FormulaConfig, VotingProtocol from app.models.vote import Vote, VoteSession # ── Helpers ────────────────────────────────────────────────────── async def _load_session_with_votes(db: AsyncSession, session_id: uuid.UUID) -> VoteSession: """Load a vote session with its votes eagerly, raising ValueError if not found.""" result = await db.execute( select(VoteSession) .options(selectinload(VoteSession.votes)) .where(VoteSession.id == session_id) ) session = result.scalar_one_or_none() if session is None: raise ValueError(f"Session de vote introuvable : {session_id}") return session async def _load_protocol_with_formula(db: AsyncSession, protocol_id: uuid.UUID) -> VotingProtocol: """Load a voting protocol with its formula config, raising ValueError if not found.""" proto_result = await db.execute( select(VotingProtocol) .options(selectinload(VotingProtocol.formula_config)) .where(VotingProtocol.id == protocol_id) ) protocol = proto_result.scalar_one_or_none() if protocol is None: raise ValueError(f"Protocole de vote introuvable : {protocol_id}") return protocol def _extract_params(protocol: VotingProtocol, formula: FormulaConfig) -> dict: """Extract formula parameters from mode_params or formula_config.""" if protocol.mode_params: return parse_mode_params(protocol.mode_params) return { "majority_pct": formula.majority_pct, "base_exponent": formula.base_exponent, "gradient_exponent": formula.gradient_exponent, "constant_base": formula.constant_base, "smith_exponent": formula.smith_exponent, "techcomm_exponent": formula.techcomm_exponent, } # ── Compute Result ─────────────────────────────────────────────── async def compute_result(session_id: uuid.UUID, db: AsyncSession) -> dict: """Load a vote session, its protocol and formula, compute thresholds, and tally. Parameters ---------- session_id: UUID of the VoteSession to tally. db: Async database session. Returns ------- dict Result dict with keys: threshold, votes_for, votes_against, votes_total, adopted, smith_ok, techcomm_ok, details. """ session = await _load_session_with_votes(db, session_id) protocol = await _load_protocol_with_formula(db, session.voting_protocol_id) formula: FormulaConfig = protocol.formula_config params = _extract_params(protocol, formula) # Separate vote types active_votes: list[Vote] = [v for v in session.votes if v.is_active] if protocol.vote_type == "nuanced": return await _compute_nuanced(session, active_votes, formula, params, db) # --- Binary vote --- votes_for = sum(1 for v in active_votes if v.vote_value == "for") votes_against = sum(1 for v in active_votes if v.vote_value == "against") total = votes_for + votes_against # WoT threshold threshold = wot_threshold( wot_size=session.wot_size, total_votes=total, majority_pct=params.get("majority_pct", 50), base_exponent=params.get("base_exponent", 0.1), gradient_exponent=params.get("gradient_exponent", 0.2), constant_base=params.get("constant_base", 0.0), ) # Smith criterion (optional) smith_ok = True smith_required = None if params.get("smith_exponent") is not None and session.smith_size > 0: smith_required = smith_threshold(session.smith_size, params["smith_exponent"]) smith_votes = sum(1 for v in active_votes if v.voter_is_smith and v.vote_value == "for") smith_ok = smith_votes >= smith_required # TechComm criterion (optional) techcomm_ok = True techcomm_required = None if params.get("techcomm_exponent") is not None and session.techcomm_size > 0: techcomm_required = techcomm_threshold(session.techcomm_size, params["techcomm_exponent"]) techcomm_votes = sum(1 for v in active_votes if v.voter_is_techcomm and v.vote_value == "for") techcomm_ok = techcomm_votes >= techcomm_required adopted = votes_for >= threshold and smith_ok and techcomm_ok vote_result = "adopted" if adopted else "rejected" # Update session tallies session.votes_for = votes_for session.votes_against = votes_against session.votes_total = total session.threshold_required = float(threshold) session.result = vote_result session.status = "tallied" await db.commit() return { "threshold": threshold, "votes_for": votes_for, "votes_against": votes_against, "votes_total": total, "adopted": adopted, "smith_ok": smith_ok, "smith_required": smith_required, "techcomm_ok": techcomm_ok, "techcomm_required": techcomm_required, "result": vote_result, } async def _compute_nuanced( session: VoteSession, active_votes: list[Vote], formula: FormulaConfig, params: dict, db: AsyncSession, ) -> dict: """Compute a nuanced vote result with full per-level breakdown.""" vote_levels = [v.nuanced_level for v in active_votes if v.nuanced_level is not None] threshold_pct = formula.nuanced_threshold_pct or 80 min_participants = formula.nuanced_min_participants or 59 evaluation = evaluate_nuanced( votes=vote_levels, threshold_pct=threshold_pct, min_participants=min_participants, ) vote_result = "adopted" if evaluation["adopted"] else "rejected" session.votes_total = evaluation["total"] session.votes_for = evaluation["positive_count"] session.votes_against = evaluation["total"] - evaluation["positive_count"] session.threshold_required = float(threshold_pct) session.result = vote_result session.status = "tallied" await db.commit() return { "vote_type": "nuanced", "result": vote_result, "nuanced_breakdown": { "per_level_counts": evaluation["per_level_counts"], "positive_count": evaluation["positive_count"], "positive_pct": evaluation["positive_pct"], "threshold_met": evaluation["threshold_met"], "min_participants_met": evaluation["min_participants_met"], "threshold_pct": threshold_pct, "min_participants": min_participants, }, **evaluation, } # ── Close Session ──────────────────────────────────────────────── async def close_session(session_id: uuid.UUID, db: AsyncSession) -> dict: """Close a vote session and compute its final result. Parameters ---------- session_id: UUID of the VoteSession to close. db: Async database session. Returns ------- dict Result dict from compute_result, augmented with close metadata. Raises ------ ValueError If the session is not found or already closed/tallied. """ session = await _load_session_with_votes(db, session_id) if session.status not in ("open",): raise ValueError( f"Impossible de fermer la session {session_id} : statut actuel '{session.status}'" ) # Mark as closed before tallying session.status = "closed" await db.commit() # Compute the final result (this will set status to "tallied") result = await compute_result(session_id, db) return { "closed_at": datetime.now(timezone.utc).isoformat(), **result, } # ── Threshold Details ──────────────────────────────────────────── async def get_threshold_details(session_id: uuid.UUID, db: AsyncSession) -> dict: """Return detailed threshold computation for a vote session. Parameters ---------- session_id: UUID of the VoteSession. db: Async database session. Returns ------- dict Detailed breakdown including wot/smith/techcomm thresholds, pass/fail booleans, participation rate, and formula params. """ session = await _load_session_with_votes(db, session_id) protocol = await _load_protocol_with_formula(db, session.voting_protocol_id) formula: FormulaConfig = protocol.formula_config params = _extract_params(protocol, formula) active_votes: list[Vote] = [v for v in session.votes if v.is_active] votes_for = sum(1 for v in active_votes if v.vote_value == "for") votes_against = sum(1 for v in active_votes if v.vote_value == "against") total = votes_for + votes_against # WoT threshold wot_thresh = wot_threshold( wot_size=session.wot_size, total_votes=total, majority_pct=params.get("majority_pct", 50), base_exponent=params.get("base_exponent", 0.1), gradient_exponent=params.get("gradient_exponent", 0.2), constant_base=params.get("constant_base", 0.0), ) wot_passed = votes_for >= wot_thresh # Smith criterion (optional) smith_thresh = None smith_passed = None if params.get("smith_exponent") is not None and session.smith_size > 0: smith_thresh = smith_threshold(session.smith_size, params["smith_exponent"]) smith_votes = sum(1 for v in active_votes if v.voter_is_smith and v.vote_value == "for") smith_passed = smith_votes >= smith_thresh # TechComm criterion (optional) techcomm_thresh = None techcomm_passed = None if params.get("techcomm_exponent") is not None and session.techcomm_size > 0: techcomm_thresh = techcomm_threshold(session.techcomm_size, params["techcomm_exponent"]) techcomm_votes = sum(1 for v in active_votes if v.voter_is_techcomm and v.vote_value == "for") techcomm_passed = techcomm_votes >= techcomm_thresh # Overall pass: all applicable criteria must pass overall_passed = wot_passed if smith_passed is not None: overall_passed = overall_passed and smith_passed if techcomm_passed is not None: overall_passed = overall_passed and techcomm_passed # Participation rate participation_rate = (total / session.wot_size) if session.wot_size > 0 else 0.0 # Formula params used formula_params = { "M": params.get("majority_pct", 50), "B": params.get("base_exponent", 0.1), "G": params.get("gradient_exponent", 0.2), "C": params.get("constant_base", 0.0), "S": params.get("smith_exponent"), "T": params.get("techcomm_exponent"), } return { "wot_threshold": wot_thresh, "smith_threshold": smith_thresh, "techcomm_threshold": techcomm_thresh, "votes_for": votes_for, "votes_against": votes_against, "votes_total": total, "wot_passed": wot_passed, "smith_passed": smith_passed, "techcomm_passed": techcomm_passed, "overall_passed": overall_passed, "participation_rate": round(participation_rate, 6), "formula_params": formula_params, } # ── Signature Verification ─────────────────────────────────────── async def verify_vote_signature(address: str, signature: str, payload: str) -> bool: """Verify an Ed25519 signature from a Duniter V2 address. Parameters ---------- address: SS58 address of the voter. signature: Hex-encoded Ed25519 signature. payload: The original message that was signed. Returns ------- bool True if the signature is valid. TODO ---- Implement actual Ed25519 verification using substrate-interface: from substrateinterface import Keypair keypair = Keypair(ss58_address=address, crypto_type=KeypairType.ED25519) return keypair.verify(payload.encode(), bytes.fromhex(signature)) """ # TODO: Implement real Ed25519 verification with substrate-interface # For now, accept all signatures in development mode if not address or not signature or not payload: return False return True