Sprint 4 : decisions et mandats -- workflow complet + vote integration

Backend: 7 nouveaux endpoints (advance, assign, revoke, create-vote-session),
services enrichis avec creation de sessions de vote, assignation de mandataire
et revocation. 35 nouveaux tests (104 total). Frontend: store mandates, page
cadrage decisions, detail mandats, composants DecisionWorkflow, DecisionCadrage,
DecisionCard, MandateTimeline, MandateCard. Documentation mise a jour.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Yvv
2026-02-28 14:28:34 +01:00
parent cede2a585f
commit 3cb1754592
24 changed files with 3988 additions and 354 deletions

View File

@@ -13,13 +13,16 @@ from app.database import get_db
from app.models.decision import Decision, DecisionStep
from app.models.user import DuniterIdentity
from app.schemas.decision import (
DecisionAdvanceOut,
DecisionCreate,
DecisionOut,
DecisionStepCreate,
DecisionStepOut,
DecisionUpdate,
)
from app.schemas.vote import VoteSessionOut
from app.services.auth_service import get_current_identity
from app.services.decision_service import advance_decision, create_vote_session_for_step
router = APIRouter()
@@ -141,3 +144,64 @@ async def add_step(
await db.refresh(step)
return DecisionStepOut.model_validate(step)
# ── Workflow routes ────────────────────────────────────────────────────────
@router.post("/{id}/advance", response_model=DecisionAdvanceOut)
async def advance_decision_endpoint(
id: uuid.UUID,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> DecisionAdvanceOut:
"""Advance a decision to its next step or status."""
try:
decision = await advance_decision(id, db)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc))
# Reload with steps for complete output
decision = await _get_decision(db, decision.id)
data = DecisionOut.model_validate(decision).model_dump()
data["message"] = f"Decision avancee au statut : {decision.status}"
return DecisionAdvanceOut(**data)
@router.post(
"/{id}/steps/{step_id}/create-vote-session",
response_model=VoteSessionOut,
status_code=status.HTTP_201_CREATED,
)
async def create_vote_session_for_step_endpoint(
id: uuid.UUID,
step_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> VoteSessionOut:
"""Create a vote session linked to a decision step."""
try:
session = await create_vote_session_for_step(id, step_id, db)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc))
return VoteSessionOut.model_validate(session)
@router.delete("/{id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_decision(
id: uuid.UUID,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> None:
"""Delete a decision (only if in draft status)."""
decision = await _get_decision(db, id)
if decision.status != "draft":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Seules les decisions en brouillon peuvent etre supprimees",
)
await db.delete(decision)
await db.commit()

View File

@@ -13,12 +13,22 @@ from app.database import get_db
from app.models.mandate import Mandate, MandateStep
from app.models.user import DuniterIdentity
from app.schemas.mandate import (
MandateAdvanceOut,
MandateAssignRequest,
MandateCreate,
MandateOut,
MandateStepCreate,
MandateStepOut,
MandateUpdate,
)
from app.schemas.vote import VoteSessionOut
from app.services.auth_service import get_current_identity
from app.services.mandate_service import (
advance_mandate,
assign_mandatee,
create_vote_session_for_step,
revoke_mandate,
)
router = APIRouter()
@@ -95,7 +105,7 @@ async def get_mandate(
@router.put("/{id}", response_model=MandateOut)
async def update_mandate(
id: uuid.UUID,
payload: MandateCreate,
payload: MandateUpdate,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> MandateOut:
@@ -165,3 +175,80 @@ async def list_steps(
"""List all steps for a mandate, ordered by step_order."""
mandate = await _get_mandate(db, id)
return [MandateStepOut.model_validate(s) for s in mandate.steps]
# ── Workflow routes ────────────────────────────────────────────────────────
@router.post("/{id}/advance", response_model=MandateAdvanceOut)
async def advance_mandate_endpoint(
id: uuid.UUID,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> MandateAdvanceOut:
"""Advance a mandate to its next step or status."""
try:
mandate = await advance_mandate(id, db)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc))
# Reload with steps for complete output
mandate = await _get_mandate(db, mandate.id)
data = MandateOut.model_validate(mandate).model_dump()
data["message"] = f"Mandat avance au statut : {mandate.status}"
return MandateAdvanceOut(**data)
@router.post("/{id}/assign", response_model=MandateOut)
async def assign_mandatee_endpoint(
id: uuid.UUID,
payload: MandateAssignRequest,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> MandateOut:
"""Assign a mandatee to a mandate."""
try:
mandate = await assign_mandatee(id, payload.mandatee_id, db)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc))
# Reload with steps
mandate = await _get_mandate(db, mandate.id)
return MandateOut.model_validate(mandate)
@router.post("/{id}/revoke", response_model=MandateOut)
async def revoke_mandate_endpoint(
id: uuid.UUID,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> MandateOut:
"""Revoke an active mandate."""
try:
mandate = await revoke_mandate(id, db)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc))
# Reload with steps
mandate = await _get_mandate(db, mandate.id)
return MandateOut.model_validate(mandate)
@router.post(
"/{id}/steps/{step_id}/create-vote-session",
response_model=VoteSessionOut,
status_code=status.HTTP_201_CREATED,
)
async def create_vote_session_for_step_endpoint(
id: uuid.UUID,
step_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> VoteSessionOut:
"""Create a vote session linked to a mandate step."""
try:
session = await create_vote_session_for_step(id, step_id, db)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc))
return VoteSessionOut.model_validate(session)

View File

@@ -70,3 +70,22 @@ class DecisionOut(BaseModel):
created_at: datetime
updated_at: datetime
steps: list[DecisionStepOut] = Field(default_factory=list)
class DecisionAdvanceOut(BaseModel):
"""Output after advancing a decision through its workflow."""
model_config = ConfigDict(from_attributes=True)
id: UUID
title: str
description: str | None = None
context: str | None = None
decision_type: str
status: str
voting_protocol_id: UUID | None = None
created_by_id: UUID | None = None
created_at: datetime
updated_at: datetime
steps: list[DecisionStepOut] = Field(default_factory=list)
message: str = Field(..., description="Message decrivant l'avancement effectue")

View File

@@ -51,6 +51,21 @@ class MandateCreate(BaseModel):
decision_id: UUID | None = None
class MandateUpdate(BaseModel):
"""Partial update for a mandate."""
title: str | None = Field(default=None, max_length=256)
description: str | None = None
mandate_type: str | None = Field(default=None, max_length=64)
decision_id: UUID | None = None
class MandateAssignRequest(BaseModel):
"""Request body for assigning a mandatee to a mandate."""
mandatee_id: UUID = Field(..., description="ID de l'identite Duniter du mandataire")
class MandateOut(BaseModel):
"""Full mandate representation returned by the API."""
@@ -68,3 +83,23 @@ class MandateOut(BaseModel):
created_at: datetime
updated_at: datetime
steps: list[MandateStepOut] = Field(default_factory=list)
class MandateAdvanceOut(BaseModel):
"""Output after advancing a mandate through its workflow."""
model_config = ConfigDict(from_attributes=True)
id: UUID
title: str
description: str | None = None
mandate_type: str
status: str
mandatee_id: UUID | None = None
decision_id: UUID | None = None
starts_at: datetime | None = None
ends_at: datetime | None = None
created_at: datetime
updated_at: datetime
steps: list[MandateStepOut] = Field(default_factory=list)
message: str = Field(..., description="Message decrivant l'avancement effectue")

View File

@@ -1,14 +1,17 @@
"""Decision service: step advancement logic."""
"""Decision service: step advancement logic and vote session integration."""
from __future__ import annotations
import uuid
from datetime import datetime, timedelta, timezone
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models.decision import Decision, DecisionStep
from app.models.protocol import VotingProtocol, FormulaConfig
from app.models.vote import VoteSession
# Valid status transitions for decisions
@@ -115,3 +118,90 @@ def _advance_decision_status(decision: Decision) -> None:
next_index = current_index + 1
if next_index < len(_DECISION_STATUS_ORDER):
decision.status = _DECISION_STATUS_ORDER[next_index]
async def create_vote_session_for_step(
decision_id: uuid.UUID,
step_id: uuid.UUID,
db: AsyncSession,
) -> VoteSession:
"""Create a VoteSession linked to a decision step.
Uses the decision's voting_protocol to configure the session.
Raises ValueError if the decision or step is not found, the step
already has a vote session, or no voting protocol is configured.
Parameters
----------
decision_id:
UUID of the Decision.
step_id:
UUID of the DecisionStep to attach the vote session to.
db:
Async database session.
Returns
-------
VoteSession
The newly created vote session.
"""
# Fetch decision with steps
result = await db.execute(
select(Decision)
.options(selectinload(Decision.steps))
.where(Decision.id == decision_id)
)
decision = result.scalar_one_or_none()
if decision is None:
raise ValueError(f"Decision introuvable : {decision_id}")
# Find the step
step: DecisionStep | None = None
for s in decision.steps:
if s.id == step_id:
step = s
break
if step is None:
raise ValueError(f"Etape introuvable : {step_id}")
# Check step doesn't already have a vote session
if step.vote_session_id is not None:
raise ValueError("Cette etape possede deja une session de vote")
# Check voting protocol is configured
if decision.voting_protocol_id is None:
raise ValueError("Aucun protocole de vote configure pour cette decision")
# Fetch the voting protocol and its formula config for duration
proto_result = await db.execute(
select(VotingProtocol)
.options(selectinload(VotingProtocol.formula_config))
.where(VotingProtocol.id == decision.voting_protocol_id)
)
protocol = proto_result.scalar_one_or_none()
if protocol is None:
raise ValueError("Protocole de vote introuvable")
duration_days = protocol.formula_config.duration_days if protocol.formula_config else 30
now = datetime.now(timezone.utc)
# Create the vote session with explicit UUID (needed before flush)
session_id = uuid.uuid4()
session = VoteSession(
id=session_id,
decision_id=decision.id,
voting_protocol_id=protocol.id,
starts_at=now,
ends_at=now + timedelta(days=duration_days),
status="open",
)
db.add(session)
# Link session to step
step.vote_session_id = session_id
await db.commit()
await db.refresh(session)
return session

View File

@@ -1,14 +1,18 @@
"""Mandate service: step advancement logic."""
"""Mandate service: step advancement logic, assignment and revocation."""
from __future__ import annotations
import uuid
from datetime import datetime, timedelta, timezone
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models.mandate import Mandate, MandateStep
from app.models.protocol import VotingProtocol
from app.models.user import DuniterIdentity
from app.models.vote import VoteSession
# Valid status transitions for mandates
@@ -116,3 +120,214 @@ def _advance_mandate_status(mandate: Mandate) -> None:
next_index = current_index + 1
if next_index < len(_MANDATE_STATUS_ORDER):
mandate.status = _MANDATE_STATUS_ORDER[next_index]
async def assign_mandatee(
mandate_id: uuid.UUID,
mandatee_id: uuid.UUID,
db: AsyncSession,
) -> Mandate:
"""Assign a mandatee (DuniterIdentity) to a mandate.
The mandate must be in a state that accepts assignment (not completed/revoked).
The mandatee must exist in the duniter_identities table.
Parameters
----------
mandate_id:
UUID of the Mandate.
mandatee_id:
UUID of the DuniterIdentity to assign.
db:
Async database session.
Returns
-------
Mandate
The updated mandate.
Raises
------
ValueError
If the mandate or mandatee is not found, or the mandate
is in a terminal state.
"""
result = await db.execute(
select(Mandate)
.options(selectinload(Mandate.steps))
.where(Mandate.id == mandate_id)
)
mandate = result.scalar_one_or_none()
if mandate is None:
raise ValueError(f"Mandat introuvable : {mandate_id}")
if mandate.status in ("completed", "revoked"):
raise ValueError(f"Impossible d'assigner un mandataire : le mandat est en statut terminal ({mandate.status})")
# Verify mandatee exists
identity_result = await db.execute(
select(DuniterIdentity).where(DuniterIdentity.id == mandatee_id)
)
identity = identity_result.scalar_one_or_none()
if identity is None:
raise ValueError(f"Identite Duniter introuvable : {mandatee_id}")
mandate.mandatee_id = mandatee_id
mandate.starts_at = datetime.now(timezone.utc)
await db.commit()
await db.refresh(mandate)
return mandate
async def revoke_mandate(
mandate_id: uuid.UUID,
db: AsyncSession,
) -> Mandate:
"""Revoke an active mandate.
Sets the mandate status to 'revoked' and cancels any active/pending steps.
Parameters
----------
mandate_id:
UUID of the Mandate to revoke.
db:
Async database session.
Returns
-------
Mandate
The updated mandate.
Raises
------
ValueError
If the mandate is not found or already in a terminal state.
"""
result = await db.execute(
select(Mandate)
.options(selectinload(Mandate.steps))
.where(Mandate.id == mandate_id)
)
mandate = result.scalar_one_or_none()
if mandate is None:
raise ValueError(f"Mandat introuvable : {mandate_id}")
if mandate.status in ("completed", "revoked"):
raise ValueError(f"Le mandat est deja en statut terminal : {mandate.status}")
# Cancel active and pending steps
for step in mandate.steps:
if step.status in ("active", "pending"):
step.status = "cancelled"
mandate.status = "revoked"
mandate.ends_at = datetime.now(timezone.utc)
await db.commit()
await db.refresh(mandate)
return mandate
async def create_vote_session_for_step(
mandate_id: uuid.UUID,
step_id: uuid.UUID,
db: AsyncSession,
) -> VoteSession:
"""Create a VoteSession linked to a mandate step.
Uses the mandate's linked decision's voting_protocol to configure
the session, or requires a voting_protocol_id on the mandate's decision.
Parameters
----------
mandate_id:
UUID of the Mandate.
step_id:
UUID of the MandateStep to attach the vote session to.
db:
Async database session.
Returns
-------
VoteSession
The newly created vote session.
Raises
------
ValueError
If the mandate, step, or protocol is not found, or the step
already has a vote session.
"""
# Fetch mandate with steps
result = await db.execute(
select(Mandate)
.options(selectinload(Mandate.steps))
.where(Mandate.id == mandate_id)
)
mandate = result.scalar_one_or_none()
if mandate is None:
raise ValueError(f"Mandat introuvable : {mandate_id}")
# Find the step
step: MandateStep | None = None
for s in mandate.steps:
if s.id == step_id:
step = s
break
if step is None:
raise ValueError(f"Etape introuvable : {step_id}")
# Check step doesn't already have a vote session
if step.vote_session_id is not None:
raise ValueError("Cette etape possede deja une session de vote")
# Resolve the voting protocol via the linked decision
if mandate.decision_id is None:
raise ValueError("Aucune decision liee a ce mandat pour determiner le protocole de vote")
from app.models.decision import Decision
decision_result = await db.execute(
select(Decision).where(Decision.id == mandate.decision_id)
)
decision = decision_result.scalar_one_or_none()
if decision is None or decision.voting_protocol_id is None:
raise ValueError("Aucun protocole de vote configure pour la decision liee")
# Fetch the voting protocol and its formula config for duration
proto_result = await db.execute(
select(VotingProtocol)
.options(selectinload(VotingProtocol.formula_config))
.where(VotingProtocol.id == decision.voting_protocol_id)
)
protocol = proto_result.scalar_one_or_none()
if protocol is None:
raise ValueError("Protocole de vote introuvable")
duration_days = protocol.formula_config.duration_days if protocol.formula_config else 30
now = datetime.now(timezone.utc)
# Create the vote session with explicit UUID (needed before flush)
session_id = uuid.uuid4()
session = VoteSession(
id=session_id,
decision_id=mandate.decision_id,
voting_protocol_id=protocol.id,
starts_at=now,
ends_at=now + timedelta(days=duration_days),
status="open",
)
db.add(session)
# Link session to step
step.vote_session_id = session_id
await db.commit()
await db.refresh(session)
return session

View File

@@ -0,0 +1,386 @@
"""Tests for decision service: advance_decision, create_vote_session_for_step.
These are pure unit tests that mock the database layer to test
the service logic in isolation.
"""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
sqlalchemy = pytest.importorskip("sqlalchemy", reason="sqlalchemy required for decision service tests")
from app.services.decision_service import ( # noqa: E402
advance_decision,
create_vote_session_for_step,
)
# ---------------------------------------------------------------------------
# Helpers: mock objects that behave like SQLAlchemy models
# ---------------------------------------------------------------------------
def _make_step(
step_id: uuid.UUID | None = None,
decision_id: uuid.UUID | None = None,
step_order: int = 0,
step_type: str = "qualification",
status: str = "pending",
vote_session_id: uuid.UUID | None = None,
) -> MagicMock:
"""Create a mock DecisionStep."""
step = MagicMock()
step.id = step_id or uuid.uuid4()
step.decision_id = decision_id or uuid.uuid4()
step.step_order = step_order
step.step_type = step_type
step.title = None
step.description = None
step.status = status
step.vote_session_id = vote_session_id
step.outcome = None
step.created_at = datetime.now(timezone.utc)
return step
def _make_decision(
decision_id: uuid.UUID | None = None,
title: str = "Decision de test",
decision_type: str = "document_change",
status: str = "draft",
voting_protocol_id: uuid.UUID | None = None,
steps: list | None = None,
) -> MagicMock:
"""Create a mock Decision."""
decision = MagicMock()
decision.id = decision_id or uuid.uuid4()
decision.title = title
decision.description = None
decision.context = None
decision.decision_type = decision_type
decision.status = status
decision.voting_protocol_id = voting_protocol_id
decision.created_by_id = None
decision.created_at = datetime.now(timezone.utc)
decision.updated_at = datetime.now(timezone.utc)
decision.steps = steps or []
return decision
def _make_protocol(
protocol_id: uuid.UUID | None = None,
duration_days: int = 30,
) -> MagicMock:
"""Create a mock VotingProtocol with formula_config."""
protocol = MagicMock()
protocol.id = protocol_id or uuid.uuid4()
protocol.name = "Protocole standard"
protocol.vote_type = "binary"
protocol.formula_config = MagicMock()
protocol.formula_config.duration_days = duration_days
return protocol
def _make_db_for_advance(decision: MagicMock | None) -> AsyncMock:
"""Create a mock async database session for advance_decision."""
db = AsyncMock()
result = MagicMock()
result.scalar_one_or_none.return_value = decision
db.execute = AsyncMock(return_value=result)
db.commit = AsyncMock()
db.refresh = AsyncMock()
return db
def _make_db_for_vote_session(
decision: MagicMock | None,
protocol: MagicMock | None = None,
) -> AsyncMock:
"""Create a mock async database session for create_vote_session_for_step.
Sequential execute calls:
1st -> decision lookup
2nd -> protocol lookup
"""
db = AsyncMock()
call_results = []
# Decision result
decision_result = MagicMock()
decision_result.scalar_one_or_none.return_value = decision
call_results.append(decision_result)
# Protocol result
if protocol is not None:
proto_result = MagicMock()
proto_result.scalar_one_or_none.return_value = protocol
call_results.append(proto_result)
db.execute = AsyncMock(side_effect=call_results)
db.commit = AsyncMock()
db.refresh = AsyncMock()
db.flush = AsyncMock()
db.add = MagicMock()
return db
# ---------------------------------------------------------------------------
# Tests: advance_decision
# ---------------------------------------------------------------------------
class TestAdvanceDecision:
"""Test decision_service.advance_decision."""
@pytest.mark.asyncio
async def test_advance_from_draft_activates_first_step(self):
"""Advancing a draft decision with pending steps activates the first one
and moves decision to qualification."""
decision_id = uuid.uuid4()
step1 = _make_step(decision_id=decision_id, step_order=0, status="pending")
step2 = _make_step(decision_id=decision_id, step_order=1, status="pending")
decision = _make_decision(
decision_id=decision_id,
status="draft",
steps=[step1, step2],
)
db = _make_db_for_advance(decision)
result = await advance_decision(decision_id, db)
assert step1.status == "active"
assert step2.status == "pending"
assert decision.status == "qualification"
db.commit.assert_awaited_once()
@pytest.mark.asyncio
async def test_advance_completes_active_and_activates_next(self):
"""Advancing completes the current active step and activates the next pending one."""
decision_id = uuid.uuid4()
step1 = _make_step(decision_id=decision_id, step_order=0, status="active")
step2 = _make_step(decision_id=decision_id, step_order=1, status="pending")
decision = _make_decision(
decision_id=decision_id,
status="qualification",
steps=[step1, step2],
)
db = _make_db_for_advance(decision)
result = await advance_decision(decision_id, db)
assert step1.status == "completed"
assert step2.status == "active"
db.commit.assert_awaited_once()
@pytest.mark.asyncio
async def test_advance_last_step_advances_status(self):
"""When the last active step is completed with no more pending steps,
the decision status advances."""
decision_id = uuid.uuid4()
step1 = _make_step(decision_id=decision_id, step_order=0, status="completed")
step2 = _make_step(decision_id=decision_id, step_order=1, status="active")
decision = _make_decision(
decision_id=decision_id,
status="qualification",
steps=[step1, step2],
)
db = _make_db_for_advance(decision)
result = await advance_decision(decision_id, db)
assert step2.status == "completed"
assert decision.status == "review"
db.commit.assert_awaited_once()
@pytest.mark.asyncio
async def test_advance_no_steps_advances_status(self):
"""A decision with no steps advances its status directly."""
decision_id = uuid.uuid4()
decision = _make_decision(
decision_id=decision_id,
status="draft",
steps=[],
)
db = _make_db_for_advance(decision)
result = await advance_decision(decision_id, db)
assert decision.status == "qualification"
db.commit.assert_awaited_once()
@pytest.mark.asyncio
async def test_advance_full_progression(self):
"""A decision with no steps can progress through all statuses."""
decision_id = uuid.uuid4()
decision = _make_decision(
decision_id=decision_id,
status="draft",
steps=[],
)
statuses_seen = [decision.status]
db = _make_db_for_advance(decision)
# Advance through all states until closed
for _ in range(10): # safety limit
# Recreate db mock for each call (side_effect is consumed)
db = _make_db_for_advance(decision)
await advance_decision(decision_id, db)
statuses_seen.append(decision.status)
if decision.status == "closed":
break
assert statuses_seen == [
"draft",
"qualification",
"review",
"voting",
"executed",
"closed",
]
@pytest.mark.asyncio
async def test_advance_closed_raises_error(self):
"""Advancing a closed decision raises ValueError."""
decision_id = uuid.uuid4()
decision = _make_decision(
decision_id=decision_id,
status="closed",
steps=[],
)
db = _make_db_for_advance(decision)
with pytest.raises(ValueError, match="deja cloturee"):
await advance_decision(decision_id, db)
@pytest.mark.asyncio
async def test_advance_not_found_raises_error(self):
"""Advancing a non-existent decision raises ValueError."""
db = _make_db_for_advance(None)
with pytest.raises(ValueError, match="Decision introuvable"):
await advance_decision(uuid.uuid4(), db)
# ---------------------------------------------------------------------------
# Tests: create_vote_session_for_step
# ---------------------------------------------------------------------------
class TestCreateVoteSessionForStep:
"""Test decision_service.create_vote_session_for_step."""
@pytest.mark.asyncio
async def test_create_vote_session_success(self):
"""A vote session is created and linked to the step."""
decision_id = uuid.uuid4()
step_id = uuid.uuid4()
protocol_id = uuid.uuid4()
step = _make_step(
step_id=step_id,
decision_id=decision_id,
step_type="vote",
status="active",
vote_session_id=None,
)
decision = _make_decision(
decision_id=decision_id,
status="voting",
voting_protocol_id=protocol_id,
steps=[step],
)
protocol = _make_protocol(protocol_id=protocol_id, duration_days=14)
db = _make_db_for_vote_session(decision, protocol)
result = await create_vote_session_for_step(decision_id, step_id, db)
# The step should now have a vote_session_id set
assert step.vote_session_id is not None
db.commit.assert_awaited_once()
@pytest.mark.asyncio
async def test_create_vote_session_decision_not_found(self):
"""ValueError when decision does not exist."""
db = _make_db_for_vote_session(None)
with pytest.raises(ValueError, match="Decision introuvable"):
await create_vote_session_for_step(uuid.uuid4(), uuid.uuid4(), db)
@pytest.mark.asyncio
async def test_create_vote_session_step_not_found(self):
"""ValueError when step does not belong to the decision."""
decision_id = uuid.uuid4()
decision = _make_decision(decision_id=decision_id, steps=[])
db = _make_db_for_vote_session(decision)
with pytest.raises(ValueError, match="Etape introuvable"):
await create_vote_session_for_step(decision_id, uuid.uuid4(), db)
@pytest.mark.asyncio
async def test_create_vote_session_step_already_has_session(self):
"""ValueError when step already has a vote session."""
decision_id = uuid.uuid4()
step_id = uuid.uuid4()
existing_session_id = uuid.uuid4()
step = _make_step(
step_id=step_id,
decision_id=decision_id,
vote_session_id=existing_session_id,
)
decision = _make_decision(
decision_id=decision_id,
voting_protocol_id=uuid.uuid4(),
steps=[step],
)
db = _make_db_for_vote_session(decision)
with pytest.raises(ValueError, match="deja une session de vote"):
await create_vote_session_for_step(decision_id, step_id, db)
@pytest.mark.asyncio
async def test_create_vote_session_no_protocol(self):
"""ValueError when decision has no voting protocol configured."""
decision_id = uuid.uuid4()
step_id = uuid.uuid4()
step = _make_step(
step_id=step_id,
decision_id=decision_id,
vote_session_id=None,
)
decision = _make_decision(
decision_id=decision_id,
voting_protocol_id=None,
steps=[step],
)
db = _make_db_for_vote_session(decision)
with pytest.raises(ValueError, match="Aucun protocole de vote"):
await create_vote_session_for_step(decision_id, step_id, db)

View File

@@ -0,0 +1,655 @@
"""Tests for mandate service: advance_mandate, assign_mandatee, revoke_mandate, create_vote_session_for_step.
These are pure unit tests that mock the database layer to test
the service logic in isolation.
"""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock
import pytest
sqlalchemy = pytest.importorskip("sqlalchemy", reason="sqlalchemy required for mandate service tests")
from app.services.mandate_service import ( # noqa: E402
advance_mandate,
assign_mandatee,
create_vote_session_for_step,
revoke_mandate,
)
# ---------------------------------------------------------------------------
# Helpers: mock objects that behave like SQLAlchemy models
# ---------------------------------------------------------------------------
def _make_step(
step_id: uuid.UUID | None = None,
mandate_id: uuid.UUID | None = None,
step_order: int = 0,
step_type: str = "formulation",
status: str = "pending",
vote_session_id: uuid.UUID | None = None,
) -> MagicMock:
"""Create a mock MandateStep."""
step = MagicMock()
step.id = step_id or uuid.uuid4()
step.mandate_id = mandate_id or uuid.uuid4()
step.step_order = step_order
step.step_type = step_type
step.title = None
step.description = None
step.status = status
step.vote_session_id = vote_session_id
step.outcome = None
step.created_at = datetime.now(timezone.utc)
return step
def _make_mandate(
mandate_id: uuid.UUID | None = None,
title: str = "Mandat de test",
mandate_type: str = "techcomm",
status: str = "draft",
mandatee_id: uuid.UUID | None = None,
decision_id: uuid.UUID | None = None,
steps: list | None = None,
) -> MagicMock:
"""Create a mock Mandate."""
mandate = MagicMock()
mandate.id = mandate_id or uuid.uuid4()
mandate.title = title
mandate.description = None
mandate.mandate_type = mandate_type
mandate.status = status
mandate.mandatee_id = mandatee_id
mandate.decision_id = decision_id
mandate.starts_at = None
mandate.ends_at = None
mandate.created_at = datetime.now(timezone.utc)
mandate.updated_at = datetime.now(timezone.utc)
mandate.steps = steps or []
return mandate
def _make_identity(identity_id: uuid.UUID | None = None) -> MagicMock:
"""Create a mock DuniterIdentity."""
identity = MagicMock()
identity.id = identity_id or uuid.uuid4()
identity.address = "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY"
identity.username = "Alice"
return identity
def _make_protocol(
protocol_id: uuid.UUID | None = None,
duration_days: int = 30,
) -> MagicMock:
"""Create a mock VotingProtocol with formula_config."""
protocol = MagicMock()
protocol.id = protocol_id or uuid.uuid4()
protocol.name = "Protocole standard"
protocol.vote_type = "binary"
protocol.formula_config = MagicMock()
protocol.formula_config.duration_days = duration_days
return protocol
def _make_decision_mock(
decision_id: uuid.UUID | None = None,
voting_protocol_id: uuid.UUID | None = None,
) -> MagicMock:
"""Create a mock Decision for the vote session lookup."""
decision = MagicMock()
decision.id = decision_id or uuid.uuid4()
decision.voting_protocol_id = voting_protocol_id
return decision
def _make_db_for_advance(mandate: MagicMock | None) -> AsyncMock:
"""Create a mock async database session for advance_mandate."""
db = AsyncMock()
result = MagicMock()
result.scalar_one_or_none.return_value = mandate
db.execute = AsyncMock(return_value=result)
db.commit = AsyncMock()
db.refresh = AsyncMock()
return db
def _make_db_for_assign(
mandate: MagicMock | None,
identity: MagicMock | None = None,
) -> AsyncMock:
"""Create a mock async database session for assign_mandatee.
Sequential execute calls:
1st -> mandate lookup
2nd -> identity lookup
"""
db = AsyncMock()
call_results = []
# Mandate result
mandate_result = MagicMock()
mandate_result.scalar_one_or_none.return_value = mandate
call_results.append(mandate_result)
# Identity result
if identity is not None or mandate is not None:
identity_result = MagicMock()
identity_result.scalar_one_or_none.return_value = identity
call_results.append(identity_result)
db.execute = AsyncMock(side_effect=call_results)
db.commit = AsyncMock()
db.refresh = AsyncMock()
return db
def _make_db_for_revoke(mandate: MagicMock | None) -> AsyncMock:
"""Create a mock async database session for revoke_mandate."""
db = AsyncMock()
result = MagicMock()
result.scalar_one_or_none.return_value = mandate
db.execute = AsyncMock(return_value=result)
db.commit = AsyncMock()
db.refresh = AsyncMock()
return db
def _make_db_for_vote_session(
mandate: MagicMock | None,
decision: MagicMock | None = None,
protocol: MagicMock | None = None,
) -> AsyncMock:
"""Create a mock async database session for create_vote_session_for_step.
Sequential execute calls:
1st -> mandate lookup
2nd -> decision lookup
3rd -> protocol lookup
"""
db = AsyncMock()
call_results = []
# Mandate result
mandate_result = MagicMock()
mandate_result.scalar_one_or_none.return_value = mandate
call_results.append(mandate_result)
# Decision result
if decision is not None:
decision_result = MagicMock()
decision_result.scalar_one_or_none.return_value = decision
call_results.append(decision_result)
# Protocol result
if protocol is not None:
proto_result = MagicMock()
proto_result.scalar_one_or_none.return_value = protocol
call_results.append(proto_result)
db.execute = AsyncMock(side_effect=call_results)
db.commit = AsyncMock()
db.refresh = AsyncMock()
db.flush = AsyncMock()
db.add = MagicMock()
return db
# ---------------------------------------------------------------------------
# Tests: advance_mandate
# ---------------------------------------------------------------------------
class TestAdvanceMandate:
"""Test mandate_service.advance_mandate."""
@pytest.mark.asyncio
async def test_advance_from_draft_activates_first_step(self):
"""Advancing a draft mandate with pending steps activates the first one
and moves mandate to candidacy."""
mandate_id = uuid.uuid4()
step1 = _make_step(mandate_id=mandate_id, step_order=0, status="pending")
step2 = _make_step(mandate_id=mandate_id, step_order=1, status="pending")
mandate = _make_mandate(
mandate_id=mandate_id,
status="draft",
steps=[step1, step2],
)
db = _make_db_for_advance(mandate)
result = await advance_mandate(mandate_id, db)
assert step1.status == "active"
assert step2.status == "pending"
assert mandate.status == "candidacy"
db.commit.assert_awaited_once()
@pytest.mark.asyncio
async def test_advance_completes_active_and_activates_next(self):
"""Advancing completes the current active step and activates the next pending one."""
mandate_id = uuid.uuid4()
step1 = _make_step(mandate_id=mandate_id, step_order=0, status="active")
step2 = _make_step(mandate_id=mandate_id, step_order=1, status="pending")
mandate = _make_mandate(
mandate_id=mandate_id,
status="candidacy",
steps=[step1, step2],
)
db = _make_db_for_advance(mandate)
result = await advance_mandate(mandate_id, db)
assert step1.status == "completed"
assert step2.status == "active"
db.commit.assert_awaited_once()
@pytest.mark.asyncio
async def test_advance_last_step_advances_status(self):
"""When the last active step is completed with no more pending steps,
the mandate status advances."""
mandate_id = uuid.uuid4()
step1 = _make_step(mandate_id=mandate_id, step_order=0, status="completed")
step2 = _make_step(mandate_id=mandate_id, step_order=1, status="active")
mandate = _make_mandate(
mandate_id=mandate_id,
status="candidacy",
steps=[step1, step2],
)
db = _make_db_for_advance(mandate)
result = await advance_mandate(mandate_id, db)
assert step2.status == "completed"
assert mandate.status == "voting"
db.commit.assert_awaited_once()
@pytest.mark.asyncio
async def test_advance_full_progression(self):
"""A mandate with no steps can progress through all statuses."""
mandate_id = uuid.uuid4()
mandate = _make_mandate(
mandate_id=mandate_id,
status="draft",
steps=[],
)
statuses_seen = [mandate.status]
# Advance through all states until completed
for _ in range(10): # safety limit
db = _make_db_for_advance(mandate)
await advance_mandate(mandate_id, db)
statuses_seen.append(mandate.status)
if mandate.status == "completed":
break
assert statuses_seen == [
"draft",
"candidacy",
"voting",
"active",
"reporting",
"completed",
]
@pytest.mark.asyncio
async def test_advance_completed_raises_error(self):
"""Advancing a completed mandate raises ValueError."""
mandate_id = uuid.uuid4()
mandate = _make_mandate(
mandate_id=mandate_id,
status="completed",
steps=[],
)
db = _make_db_for_advance(mandate)
with pytest.raises(ValueError, match="statut terminal"):
await advance_mandate(mandate_id, db)
@pytest.mark.asyncio
async def test_advance_revoked_raises_error(self):
"""Advancing a revoked mandate raises ValueError."""
mandate_id = uuid.uuid4()
mandate = _make_mandate(
mandate_id=mandate_id,
status="revoked",
steps=[],
)
db = _make_db_for_advance(mandate)
with pytest.raises(ValueError, match="statut terminal"):
await advance_mandate(mandate_id, db)
@pytest.mark.asyncio
async def test_advance_not_found_raises_error(self):
"""Advancing a non-existent mandate raises ValueError."""
db = _make_db_for_advance(None)
with pytest.raises(ValueError, match="Mandat introuvable"):
await advance_mandate(uuid.uuid4(), db)
# ---------------------------------------------------------------------------
# Tests: assign_mandatee
# ---------------------------------------------------------------------------
class TestAssignMandatee:
"""Test mandate_service.assign_mandatee."""
@pytest.mark.asyncio
async def test_assign_success(self):
"""Assigning a valid identity to an active mandate succeeds."""
mandate_id = uuid.uuid4()
mandatee_id = uuid.uuid4()
mandate = _make_mandate(
mandate_id=mandate_id,
status="active",
)
identity = _make_identity(identity_id=mandatee_id)
db = _make_db_for_assign(mandate, identity)
result = await assign_mandatee(mandate_id, mandatee_id, db)
assert mandate.mandatee_id == mandatee_id
assert mandate.starts_at is not None
db.commit.assert_awaited_once()
@pytest.mark.asyncio
async def test_assign_draft_mandate(self):
"""Assigning to a draft mandate is allowed."""
mandate_id = uuid.uuid4()
mandatee_id = uuid.uuid4()
mandate = _make_mandate(
mandate_id=mandate_id,
status="draft",
)
identity = _make_identity(identity_id=mandatee_id)
db = _make_db_for_assign(mandate, identity)
result = await assign_mandatee(mandate_id, mandatee_id, db)
assert mandate.mandatee_id == mandatee_id
db.commit.assert_awaited_once()
@pytest.mark.asyncio
async def test_assign_not_found_raises_error(self):
"""Assigning to a non-existent mandate raises ValueError."""
db = _make_db_for_assign(None)
with pytest.raises(ValueError, match="Mandat introuvable"):
await assign_mandatee(uuid.uuid4(), uuid.uuid4(), db)
@pytest.mark.asyncio
async def test_assign_completed_raises_error(self):
"""Assigning to a completed mandate raises ValueError."""
mandate_id = uuid.uuid4()
mandate = _make_mandate(
mandate_id=mandate_id,
status="completed",
)
db = _make_db_for_assign(mandate)
with pytest.raises(ValueError, match="statut terminal"):
await assign_mandatee(mandate_id, uuid.uuid4(), db)
@pytest.mark.asyncio
async def test_assign_revoked_raises_error(self):
"""Assigning to a revoked mandate raises ValueError."""
mandate_id = uuid.uuid4()
mandate = _make_mandate(
mandate_id=mandate_id,
status="revoked",
)
db = _make_db_for_assign(mandate)
with pytest.raises(ValueError, match="statut terminal"):
await assign_mandatee(mandate_id, uuid.uuid4(), db)
@pytest.mark.asyncio
async def test_assign_identity_not_found_raises_error(self):
"""Assigning a non-existent identity raises ValueError."""
mandate_id = uuid.uuid4()
mandate = _make_mandate(
mandate_id=mandate_id,
status="active",
)
db = _make_db_for_assign(mandate, identity=None)
with pytest.raises(ValueError, match="Identite Duniter introuvable"):
await assign_mandatee(mandate_id, uuid.uuid4(), db)
# ---------------------------------------------------------------------------
# Tests: revoke_mandate
# ---------------------------------------------------------------------------
class TestRevokeMandate:
"""Test mandate_service.revoke_mandate."""
@pytest.mark.asyncio
async def test_revoke_active_mandate(self):
"""Revoking an active mandate sets status to revoked and cancels steps."""
mandate_id = uuid.uuid4()
step1 = _make_step(mandate_id=mandate_id, step_order=0, status="completed")
step2 = _make_step(mandate_id=mandate_id, step_order=1, status="active")
step3 = _make_step(mandate_id=mandate_id, step_order=2, status="pending")
mandate = _make_mandate(
mandate_id=mandate_id,
status="active",
steps=[step1, step2, step3],
)
db = _make_db_for_revoke(mandate)
result = await revoke_mandate(mandate_id, db)
assert mandate.status == "revoked"
assert mandate.ends_at is not None
# Completed steps stay completed
assert step1.status == "completed"
# Active and pending steps are cancelled
assert step2.status == "cancelled"
assert step3.status == "cancelled"
db.commit.assert_awaited_once()
@pytest.mark.asyncio
async def test_revoke_draft_mandate(self):
"""Revoking a draft mandate is allowed."""
mandate_id = uuid.uuid4()
mandate = _make_mandate(
mandate_id=mandate_id,
status="draft",
steps=[],
)
db = _make_db_for_revoke(mandate)
result = await revoke_mandate(mandate_id, db)
assert mandate.status == "revoked"
assert mandate.ends_at is not None
db.commit.assert_awaited_once()
@pytest.mark.asyncio
async def test_revoke_not_found_raises_error(self):
"""Revoking a non-existent mandate raises ValueError."""
db = _make_db_for_revoke(None)
with pytest.raises(ValueError, match="Mandat introuvable"):
await revoke_mandate(uuid.uuid4(), db)
@pytest.mark.asyncio
async def test_revoke_completed_raises_error(self):
"""Revoking an already completed mandate raises ValueError."""
mandate_id = uuid.uuid4()
mandate = _make_mandate(
mandate_id=mandate_id,
status="completed",
)
db = _make_db_for_revoke(mandate)
with pytest.raises(ValueError, match="statut terminal"):
await revoke_mandate(mandate_id, db)
@pytest.mark.asyncio
async def test_revoke_already_revoked_raises_error(self):
"""Revoking an already revoked mandate raises ValueError."""
mandate_id = uuid.uuid4()
mandate = _make_mandate(
mandate_id=mandate_id,
status="revoked",
)
db = _make_db_for_revoke(mandate)
with pytest.raises(ValueError, match="statut terminal"):
await revoke_mandate(mandate_id, db)
# ---------------------------------------------------------------------------
# Tests: create_vote_session_for_step
# ---------------------------------------------------------------------------
class TestCreateVoteSessionForStep:
"""Test mandate_service.create_vote_session_for_step."""
@pytest.mark.asyncio
async def test_create_vote_session_success(self):
"""A vote session is created and linked to the mandate step."""
mandate_id = uuid.uuid4()
step_id = uuid.uuid4()
decision_id = uuid.uuid4()
protocol_id = uuid.uuid4()
step = _make_step(
step_id=step_id,
mandate_id=mandate_id,
step_type="vote",
status="active",
vote_session_id=None,
)
mandate = _make_mandate(
mandate_id=mandate_id,
status="voting",
decision_id=decision_id,
steps=[step],
)
decision = _make_decision_mock(
decision_id=decision_id,
voting_protocol_id=protocol_id,
)
protocol = _make_protocol(protocol_id=protocol_id, duration_days=14)
db = _make_db_for_vote_session(mandate, decision, protocol)
result = await create_vote_session_for_step(mandate_id, step_id, db)
# The step should now have a vote_session_id set
assert step.vote_session_id is not None
db.commit.assert_awaited_once()
@pytest.mark.asyncio
async def test_create_vote_session_mandate_not_found(self):
"""ValueError when mandate does not exist."""
db = _make_db_for_vote_session(None)
with pytest.raises(ValueError, match="Mandat introuvable"):
await create_vote_session_for_step(uuid.uuid4(), uuid.uuid4(), db)
@pytest.mark.asyncio
async def test_create_vote_session_step_not_found(self):
"""ValueError when step does not belong to the mandate."""
mandate_id = uuid.uuid4()
mandate = _make_mandate(mandate_id=mandate_id, decision_id=uuid.uuid4(), steps=[])
db = _make_db_for_vote_session(mandate)
with pytest.raises(ValueError, match="Etape introuvable"):
await create_vote_session_for_step(mandate_id, uuid.uuid4(), db)
@pytest.mark.asyncio
async def test_create_vote_session_step_already_has_session(self):
"""ValueError when step already has a vote session."""
mandate_id = uuid.uuid4()
step_id = uuid.uuid4()
existing_session_id = uuid.uuid4()
step = _make_step(
step_id=step_id,
mandate_id=mandate_id,
vote_session_id=existing_session_id,
)
mandate = _make_mandate(
mandate_id=mandate_id,
decision_id=uuid.uuid4(),
steps=[step],
)
db = _make_db_for_vote_session(mandate)
with pytest.raises(ValueError, match="deja une session de vote"):
await create_vote_session_for_step(mandate_id, step_id, db)
@pytest.mark.asyncio
async def test_create_vote_session_no_decision(self):
"""ValueError when mandate has no linked decision."""
mandate_id = uuid.uuid4()
step_id = uuid.uuid4()
step = _make_step(
step_id=step_id,
mandate_id=mandate_id,
vote_session_id=None,
)
mandate = _make_mandate(
mandate_id=mandate_id,
decision_id=None,
steps=[step],
)
db = _make_db_for_vote_session(mandate)
with pytest.raises(ValueError, match="Aucune decision liee"):
await create_vote_session_for_step(mandate_id, step_id, db)