Backend: 7 nouveaux endpoints (advance, assign, revoke, create-vote-session), services enrichis avec creation de sessions de vote, assignation de mandataire et revocation. 35 nouveaux tests (104 total). Frontend: store mandates, page cadrage decisions, detail mandats, composants DecisionWorkflow, DecisionCadrage, DecisionCard, MandateTimeline, MandateCard. Documentation mise a jour. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
387 lines
12 KiB
Python
387 lines
12 KiB
Python
"""Tests for decision service: advance_decision, create_vote_session_for_step.
|
|
|
|
These are pure unit tests that mock the database layer to test
|
|
the service logic in isolation.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
sqlalchemy = pytest.importorskip("sqlalchemy", reason="sqlalchemy required for decision service tests")
|
|
|
|
from app.services.decision_service import ( # noqa: E402
|
|
advance_decision,
|
|
create_vote_session_for_step,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers: mock objects that behave like SQLAlchemy models
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_step(
|
|
step_id: uuid.UUID | None = None,
|
|
decision_id: uuid.UUID | None = None,
|
|
step_order: int = 0,
|
|
step_type: str = "qualification",
|
|
status: str = "pending",
|
|
vote_session_id: uuid.UUID | None = None,
|
|
) -> MagicMock:
|
|
"""Create a mock DecisionStep."""
|
|
step = MagicMock()
|
|
step.id = step_id or uuid.uuid4()
|
|
step.decision_id = decision_id or uuid.uuid4()
|
|
step.step_order = step_order
|
|
step.step_type = step_type
|
|
step.title = None
|
|
step.description = None
|
|
step.status = status
|
|
step.vote_session_id = vote_session_id
|
|
step.outcome = None
|
|
step.created_at = datetime.now(timezone.utc)
|
|
return step
|
|
|
|
|
|
def _make_decision(
|
|
decision_id: uuid.UUID | None = None,
|
|
title: str = "Decision de test",
|
|
decision_type: str = "document_change",
|
|
status: str = "draft",
|
|
voting_protocol_id: uuid.UUID | None = None,
|
|
steps: list | None = None,
|
|
) -> MagicMock:
|
|
"""Create a mock Decision."""
|
|
decision = MagicMock()
|
|
decision.id = decision_id or uuid.uuid4()
|
|
decision.title = title
|
|
decision.description = None
|
|
decision.context = None
|
|
decision.decision_type = decision_type
|
|
decision.status = status
|
|
decision.voting_protocol_id = voting_protocol_id
|
|
decision.created_by_id = None
|
|
decision.created_at = datetime.now(timezone.utc)
|
|
decision.updated_at = datetime.now(timezone.utc)
|
|
decision.steps = steps or []
|
|
return decision
|
|
|
|
|
|
def _make_protocol(
|
|
protocol_id: uuid.UUID | None = None,
|
|
duration_days: int = 30,
|
|
) -> MagicMock:
|
|
"""Create a mock VotingProtocol with formula_config."""
|
|
protocol = MagicMock()
|
|
protocol.id = protocol_id or uuid.uuid4()
|
|
protocol.name = "Protocole standard"
|
|
protocol.vote_type = "binary"
|
|
protocol.formula_config = MagicMock()
|
|
protocol.formula_config.duration_days = duration_days
|
|
return protocol
|
|
|
|
|
|
def _make_db_for_advance(decision: MagicMock | None) -> AsyncMock:
|
|
"""Create a mock async database session for advance_decision."""
|
|
db = AsyncMock()
|
|
|
|
result = MagicMock()
|
|
result.scalar_one_or_none.return_value = decision
|
|
|
|
db.execute = AsyncMock(return_value=result)
|
|
db.commit = AsyncMock()
|
|
db.refresh = AsyncMock()
|
|
|
|
return db
|
|
|
|
|
|
def _make_db_for_vote_session(
|
|
decision: MagicMock | None,
|
|
protocol: MagicMock | None = None,
|
|
) -> AsyncMock:
|
|
"""Create a mock async database session for create_vote_session_for_step.
|
|
|
|
Sequential execute calls:
|
|
1st -> decision lookup
|
|
2nd -> protocol lookup
|
|
"""
|
|
db = AsyncMock()
|
|
|
|
call_results = []
|
|
|
|
# Decision result
|
|
decision_result = MagicMock()
|
|
decision_result.scalar_one_or_none.return_value = decision
|
|
call_results.append(decision_result)
|
|
|
|
# Protocol result
|
|
if protocol is not None:
|
|
proto_result = MagicMock()
|
|
proto_result.scalar_one_or_none.return_value = protocol
|
|
call_results.append(proto_result)
|
|
|
|
db.execute = AsyncMock(side_effect=call_results)
|
|
db.commit = AsyncMock()
|
|
db.refresh = AsyncMock()
|
|
db.flush = AsyncMock()
|
|
db.add = MagicMock()
|
|
|
|
return db
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests: advance_decision
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestAdvanceDecision:
|
|
"""Test decision_service.advance_decision."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_advance_from_draft_activates_first_step(self):
|
|
"""Advancing a draft decision with pending steps activates the first one
|
|
and moves decision to qualification."""
|
|
decision_id = uuid.uuid4()
|
|
step1 = _make_step(decision_id=decision_id, step_order=0, status="pending")
|
|
step2 = _make_step(decision_id=decision_id, step_order=1, status="pending")
|
|
|
|
decision = _make_decision(
|
|
decision_id=decision_id,
|
|
status="draft",
|
|
steps=[step1, step2],
|
|
)
|
|
|
|
db = _make_db_for_advance(decision)
|
|
|
|
result = await advance_decision(decision_id, db)
|
|
|
|
assert step1.status == "active"
|
|
assert step2.status == "pending"
|
|
assert decision.status == "qualification"
|
|
db.commit.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_advance_completes_active_and_activates_next(self):
|
|
"""Advancing completes the current active step and activates the next pending one."""
|
|
decision_id = uuid.uuid4()
|
|
step1 = _make_step(decision_id=decision_id, step_order=0, status="active")
|
|
step2 = _make_step(decision_id=decision_id, step_order=1, status="pending")
|
|
|
|
decision = _make_decision(
|
|
decision_id=decision_id,
|
|
status="qualification",
|
|
steps=[step1, step2],
|
|
)
|
|
|
|
db = _make_db_for_advance(decision)
|
|
|
|
result = await advance_decision(decision_id, db)
|
|
|
|
assert step1.status == "completed"
|
|
assert step2.status == "active"
|
|
db.commit.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_advance_last_step_advances_status(self):
|
|
"""When the last active step is completed with no more pending steps,
|
|
the decision status advances."""
|
|
decision_id = uuid.uuid4()
|
|
step1 = _make_step(decision_id=decision_id, step_order=0, status="completed")
|
|
step2 = _make_step(decision_id=decision_id, step_order=1, status="active")
|
|
|
|
decision = _make_decision(
|
|
decision_id=decision_id,
|
|
status="qualification",
|
|
steps=[step1, step2],
|
|
)
|
|
|
|
db = _make_db_for_advance(decision)
|
|
|
|
result = await advance_decision(decision_id, db)
|
|
|
|
assert step2.status == "completed"
|
|
assert decision.status == "review"
|
|
db.commit.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_advance_no_steps_advances_status(self):
|
|
"""A decision with no steps advances its status directly."""
|
|
decision_id = uuid.uuid4()
|
|
decision = _make_decision(
|
|
decision_id=decision_id,
|
|
status="draft",
|
|
steps=[],
|
|
)
|
|
|
|
db = _make_db_for_advance(decision)
|
|
|
|
result = await advance_decision(decision_id, db)
|
|
|
|
assert decision.status == "qualification"
|
|
db.commit.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_advance_full_progression(self):
|
|
"""A decision with no steps can progress through all statuses."""
|
|
decision_id = uuid.uuid4()
|
|
decision = _make_decision(
|
|
decision_id=decision_id,
|
|
status="draft",
|
|
steps=[],
|
|
)
|
|
|
|
statuses_seen = [decision.status]
|
|
db = _make_db_for_advance(decision)
|
|
|
|
# Advance through all states until closed
|
|
for _ in range(10): # safety limit
|
|
# Recreate db mock for each call (side_effect is consumed)
|
|
db = _make_db_for_advance(decision)
|
|
await advance_decision(decision_id, db)
|
|
statuses_seen.append(decision.status)
|
|
if decision.status == "closed":
|
|
break
|
|
|
|
assert statuses_seen == [
|
|
"draft",
|
|
"qualification",
|
|
"review",
|
|
"voting",
|
|
"executed",
|
|
"closed",
|
|
]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_advance_closed_raises_error(self):
|
|
"""Advancing a closed decision raises ValueError."""
|
|
decision_id = uuid.uuid4()
|
|
decision = _make_decision(
|
|
decision_id=decision_id,
|
|
status="closed",
|
|
steps=[],
|
|
)
|
|
|
|
db = _make_db_for_advance(decision)
|
|
|
|
with pytest.raises(ValueError, match="deja cloturee"):
|
|
await advance_decision(decision_id, db)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_advance_not_found_raises_error(self):
|
|
"""Advancing a non-existent decision raises ValueError."""
|
|
db = _make_db_for_advance(None)
|
|
|
|
with pytest.raises(ValueError, match="Decision introuvable"):
|
|
await advance_decision(uuid.uuid4(), db)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests: create_vote_session_for_step
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestCreateVoteSessionForStep:
|
|
"""Test decision_service.create_vote_session_for_step."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_vote_session_success(self):
|
|
"""A vote session is created and linked to the step."""
|
|
decision_id = uuid.uuid4()
|
|
step_id = uuid.uuid4()
|
|
protocol_id = uuid.uuid4()
|
|
|
|
step = _make_step(
|
|
step_id=step_id,
|
|
decision_id=decision_id,
|
|
step_type="vote",
|
|
status="active",
|
|
vote_session_id=None,
|
|
)
|
|
|
|
decision = _make_decision(
|
|
decision_id=decision_id,
|
|
status="voting",
|
|
voting_protocol_id=protocol_id,
|
|
steps=[step],
|
|
)
|
|
|
|
protocol = _make_protocol(protocol_id=protocol_id, duration_days=14)
|
|
db = _make_db_for_vote_session(decision, protocol)
|
|
|
|
result = await create_vote_session_for_step(decision_id, step_id, db)
|
|
|
|
# The step should now have a vote_session_id set
|
|
assert step.vote_session_id is not None
|
|
db.commit.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_vote_session_decision_not_found(self):
|
|
"""ValueError when decision does not exist."""
|
|
db = _make_db_for_vote_session(None)
|
|
|
|
with pytest.raises(ValueError, match="Decision introuvable"):
|
|
await create_vote_session_for_step(uuid.uuid4(), uuid.uuid4(), db)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_vote_session_step_not_found(self):
|
|
"""ValueError when step does not belong to the decision."""
|
|
decision_id = uuid.uuid4()
|
|
decision = _make_decision(decision_id=decision_id, steps=[])
|
|
|
|
db = _make_db_for_vote_session(decision)
|
|
|
|
with pytest.raises(ValueError, match="Etape introuvable"):
|
|
await create_vote_session_for_step(decision_id, uuid.uuid4(), db)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_vote_session_step_already_has_session(self):
|
|
"""ValueError when step already has a vote session."""
|
|
decision_id = uuid.uuid4()
|
|
step_id = uuid.uuid4()
|
|
existing_session_id = uuid.uuid4()
|
|
|
|
step = _make_step(
|
|
step_id=step_id,
|
|
decision_id=decision_id,
|
|
vote_session_id=existing_session_id,
|
|
)
|
|
|
|
decision = _make_decision(
|
|
decision_id=decision_id,
|
|
voting_protocol_id=uuid.uuid4(),
|
|
steps=[step],
|
|
)
|
|
|
|
db = _make_db_for_vote_session(decision)
|
|
|
|
with pytest.raises(ValueError, match="deja une session de vote"):
|
|
await create_vote_session_for_step(decision_id, step_id, db)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_vote_session_no_protocol(self):
|
|
"""ValueError when decision has no voting protocol configured."""
|
|
decision_id = uuid.uuid4()
|
|
step_id = uuid.uuid4()
|
|
|
|
step = _make_step(
|
|
step_id=step_id,
|
|
decision_id=decision_id,
|
|
vote_session_id=None,
|
|
)
|
|
|
|
decision = _make_decision(
|
|
decision_id=decision_id,
|
|
voting_protocol_id=None,
|
|
steps=[step],
|
|
)
|
|
|
|
db = _make_db_for_vote_session(decision)
|
|
|
|
with pytest.raises(ValueError, match="Aucun protocole de vote"):
|
|
await create_vote_session_for_step(decision_id, step_id, db)
|