Files
decision/backend/app/services/decision_service.py
Yvv 3cb1754592 Sprint 4 : decisions et mandats -- workflow complet + vote integration
Backend: 7 nouveaux endpoints (advance, assign, revoke, create-vote-session),
services enrichis avec creation de sessions de vote, assignation de mandataire
et revocation. 35 nouveaux tests (104 total). Frontend: store mandates, page
cadrage decisions, detail mandats, composants DecisionWorkflow, DecisionCadrage,
DecisionCard, MandateTimeline, MandateCard. Documentation mise a jour.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 14:28:34 +01:00

208 lines
6.0 KiB
Python

"""Decision service: step advancement logic and vote session integration."""
from __future__ import annotations
import uuid
from datetime import datetime, timedelta, timezone
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models.decision import Decision, DecisionStep
from app.models.protocol import VotingProtocol, FormulaConfig
from app.models.vote import VoteSession
# Valid status transitions for decisions
_DECISION_STATUS_ORDER = [
"draft",
"qualification",
"review",
"voting",
"executed",
"closed",
]
async def advance_decision(decision_id: uuid.UUID, db: AsyncSession) -> Decision:
"""Move a decision to its next step.
Completes the current active step and activates the next pending step.
If no more steps remain, the decision status advances to the next phase.
Parameters
----------
decision_id:
UUID of the Decision to advance.
db:
Async database session.
Returns
-------
Decision
The updated decision.
Raises
------
ValueError
If the decision is not found, or no further advancement is possible.
"""
result = await db.execute(
select(Decision)
.options(selectinload(Decision.steps))
.where(Decision.id == decision_id)
)
decision = result.scalar_one_or_none()
if decision is None:
raise ValueError(f"Decision introuvable : {decision_id}")
if decision.status == "closed":
raise ValueError("La decision est deja cloturee")
steps: list[DecisionStep] = sorted(decision.steps, key=lambda s: s.step_order)
# Find the current active step
active_step: DecisionStep | None = None
for step in steps:
if step.status == "active":
active_step = step
break
if active_step is not None:
# Complete the active step
active_step.status = "completed"
# Activate the next pending step
next_step: DecisionStep | None = None
for step in steps:
if step.step_order > active_step.step_order and step.status == "pending":
next_step = step
break
if next_step is not None:
next_step.status = "active"
else:
# No more steps: advance the decision status
_advance_decision_status(decision)
else:
# No active step: try to activate the first pending step
first_pending: DecisionStep | None = None
for step in steps:
if step.status == "pending":
first_pending = step
break
if first_pending is not None:
first_pending.status = "active"
# Also advance decision out of draft if needed
if decision.status == "draft":
decision.status = "qualification"
else:
# All steps are completed: advance the decision status
_advance_decision_status(decision)
await db.commit()
await db.refresh(decision)
return decision
def _advance_decision_status(decision: Decision) -> None:
"""Move a decision to its next status in the lifecycle."""
try:
current_index = _DECISION_STATUS_ORDER.index(decision.status)
except ValueError:
return
next_index = current_index + 1
if next_index < len(_DECISION_STATUS_ORDER):
decision.status = _DECISION_STATUS_ORDER[next_index]
async def create_vote_session_for_step(
decision_id: uuid.UUID,
step_id: uuid.UUID,
db: AsyncSession,
) -> VoteSession:
"""Create a VoteSession linked to a decision step.
Uses the decision's voting_protocol to configure the session.
Raises ValueError if the decision or step is not found, the step
already has a vote session, or no voting protocol is configured.
Parameters
----------
decision_id:
UUID of the Decision.
step_id:
UUID of the DecisionStep to attach the vote session to.
db:
Async database session.
Returns
-------
VoteSession
The newly created vote session.
"""
# Fetch decision with steps
result = await db.execute(
select(Decision)
.options(selectinload(Decision.steps))
.where(Decision.id == decision_id)
)
decision = result.scalar_one_or_none()
if decision is None:
raise ValueError(f"Decision introuvable : {decision_id}")
# Find the step
step: DecisionStep | None = None
for s in decision.steps:
if s.id == step_id:
step = s
break
if step is None:
raise ValueError(f"Etape introuvable : {step_id}")
# Check step doesn't already have a vote session
if step.vote_session_id is not None:
raise ValueError("Cette etape possede deja une session de vote")
# Check voting protocol is configured
if decision.voting_protocol_id is None:
raise ValueError("Aucun protocole de vote configure pour cette decision")
# Fetch the voting protocol and its formula config for duration
proto_result = await db.execute(
select(VotingProtocol)
.options(selectinload(VotingProtocol.formula_config))
.where(VotingProtocol.id == decision.voting_protocol_id)
)
protocol = proto_result.scalar_one_or_none()
if protocol is None:
raise ValueError("Protocole de vote introuvable")
duration_days = protocol.formula_config.duration_days if protocol.formula_config else 30
now = datetime.now(timezone.utc)
# Create the vote session with explicit UUID (needed before flush)
session_id = uuid.uuid4()
session = VoteSession(
id=session_id,
decision_id=decision.id,
voting_protocol_id=protocol.id,
starts_at=now,
ends_at=now + timedelta(days=duration_days),
status="open",
)
db.add(session)
# Link session to step
step.vote_session_id = session_id
await db.commit()
await db.refresh(session)
return session