Qualifier : corrections R2/R6 + router + modèle DB + wizard frontend

Corrections moteur (TDD) :
- R2 : within_mandate → record_in_observatory=True (Observatoire des décisions)
- R6 : >50 personnes → collective recommandé, pas obligatoire (confidence=recommended)
- R3 supprimée : affected_count=1 hors périmètre de l'outil
- R9-R12 renommés G1-G4 (garde-fous internes)
- 23 tests, 213/213 verts

Étape 1 — Router /api/v1/qualify :
- POST / → qualify() avec config depuis DB ou defaults
- GET /protocol → protocole actif
- POST /protocol → créer/remplacer (auth requise)

Étape 2 — Modèle QualificationProtocol :
- Table qualification_protocols (seuils configurables via admin)
- Migration Alembic + seed du protocole par défaut

Étape 3 — Wizard frontend decisions/new.vue :
- Étape 1 : formulaire de qualification (mandat, affected_count, structurant, contexte)
- Étape 2 : résultat (type, raisons, modalités, observatoire, on-chain)
- Étape 3 : formulaire de décision (titre, description, protocole si collectif)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Yvv
2026-04-23 19:12:01 +02:00
parent 428299c9c8
commit 5c51cffc93
11 changed files with 1060 additions and 519 deletions

View File

@@ -39,6 +39,7 @@ from app.models import ( # noqa: F401
MandateStep,
VotingProtocol,
FormulaConfig,
QualificationProtocol,
SanctuaryEntry,
BlockchainCache,
)

View File

@@ -0,0 +1,39 @@
"""add_qualification_protocol
Revision ID: b78571ae9e00
Revises: 70914b334cfb
Create Date: 2026-04-23 17:08:07.161306+00:00
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
revision: str = 'b78571ae9e00'
down_revision: Union[str, None] = '70914b334cfb'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
'qualification_protocols',
sa.Column('id', sa.Uuid(), nullable=False),
sa.Column('name', sa.String(128), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('small_group_max', sa.Integer(), nullable=False, server_default='5'),
sa.Column('collective_wot_min', sa.Integer(), nullable=False, server_default='50'),
sa.Column(
'default_modalities_json',
sa.Text(),
nullable=False,
server_default='["vote_wot","vote_smith","consultation_avis","election"]',
),
sa.Column('is_active', sa.Boolean(), nullable=False, server_default='1'),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.PrimaryKeyConstraint('id'),
)
def downgrade() -> None:
op.drop_table('qualification_protocols')

View File

@@ -18,7 +18,7 @@ class DecisionType(str, Enum):
# ---------------------------------------------------------------------------
# Configuration (thresholds — stored as a QualificationProtocol in DB)
# Configuration (thresholds — stored as QualificationProtocol in DB)
# ---------------------------------------------------------------------------
@@ -26,16 +26,15 @@ class DecisionType(str, Enum):
class QualificationConfig:
"""Configurable thresholds for the qualification engine.
These defaults will be seeded as a QualificationProtocol record so they
can be adjusted through the admin interface without code changes.
Seeded as a QualificationProtocol record so they can be adjusted
through the admin interface without code changes.
individual_max: affected_count <= this → always individual
small_group_max: affected_count <= this → individual recommended, collective available
collective_wot_min: affected_count > this → collective required (WoT formula applies)
small_group_max: affected_count <= this → individual recommended, collective available
collective_wot_min: affected_count > this WoT formula applicable (still recommended, not required)
Default modalities shown when collective is chosen (ordered by relevance).
affected_count must be >= 2 — decisions affecting only the author
have no place in this tool.
"""
individual_max: int = 1
small_group_max: int = 5
collective_wot_min: int = 50
@@ -55,7 +54,7 @@ class QualificationConfig:
@dataclass
class QualificationInput:
within_mandate: bool = False
affected_count: int | None = None
affected_count: int | None = None # must be >= 2 when within_mandate=False
is_structural: bool = False
context_description: str | None = None # reserved for LLM suggestion
@@ -66,10 +65,11 @@ class QualificationResult:
process: str
recommended_modalities: list[str]
recommend_onchain: bool
confidence: str # "required" | "recommended" | "optional"
onchain_reason: str | None
confidence: str # "required" | "recommended" | "optional"
collective_available: bool
record_in_observatory: bool # True → decision must be logged in Observatoire
reasons: list[str]
onchain_reason: str | None = None
# ---------------------------------------------------------------------------
@@ -83,9 +83,9 @@ def suggest_modalities_from_context(
) -> list[str]:
"""Suggest voting modalities based on a natural-language context description.
Stub — returns empty list until local Qwen (qwen3.6) is integrated.
When implemented, this will call the LLM API and return an ordered list
of modality slugs from config.default_modalities.
Stub — returns empty list until local Qwen (qwen3.6, MacStudio) is integrated.
When implemented, will call the LLM API and return an ordered subset of
config.default_modalities ranked by contextual relevance.
"""
return []
@@ -99,11 +99,11 @@ def qualify(inp: QualificationInput, config: QualificationConfig) -> Qualificati
"""Qualify a decision and recommend a type, process, and modalities.
Rules (in priority order):
R1/R2 within_mandate → individual + consultation_avis, no modalities
R3 affected_count == 1 → individual + personal
R4 affected_count ≤ small_group_max → individual recommended, collective available
R1/R2 within_mandate → individual + consultation_avis, no vote modalities,
decision must be recorded in Observatoire des décisions
R4 2 ≤ affected_count ≤ small_group_max → individual recommended, collective available
R5 small_group_max < affected_count ≤ collective_wot_min → collective recommended
R6 affected_count > collective_wot_min → collective required (WoT)
R6 affected_count > collective_wot_min → collective recommended (WoT formula applicable)
R7/R8 is_structural → recommend_onchain with reason
"""
reasons: list[str] = []
@@ -116,27 +116,14 @@ def qualify(inp: QualificationInput, config: QualificationConfig) -> Qualificati
process="consultation_avis",
recommended_modalities=[],
recommend_onchain=_onchain(inp, reasons),
onchain_reason=_onchain_reason(inp),
confidence="required",
collective_available=False,
record_in_observatory=True,
reasons=reasons,
onchain_reason=_onchain_reason(inp),
)
count = inp.affected_count if inp.affected_count is not None else 1
# ── R3: single person ───────────────────────────────────────────────────
if count <= config.individual_max:
reasons.append("Une seule personne concernée.")
return QualificationResult(
decision_type=DecisionType.INDIVIDUAL,
process="personal",
recommended_modalities=[],
recommend_onchain=_onchain(inp, reasons),
confidence="required",
collective_available=False,
reasons=reasons,
onchain_reason=_onchain_reason(inp),
)
count = inp.affected_count if inp.affected_count is not None else 2
# ── R4: small group → individual recommended, collective available ───────
if count <= config.small_group_max:
@@ -144,32 +131,30 @@ def qualify(inp: QualificationInput, config: QualificationConfig) -> Qualificati
f"{count} personnes concernées : décision individuelle recommandée, "
"vote collectif possible."
)
modalities = _collect_modalities(inp, config)
return QualificationResult(
decision_type=DecisionType.INDIVIDUAL,
process="personal",
recommended_modalities=[],
recommend_onchain=_onchain(inp, reasons),
onchain_reason=_onchain_reason(inp),
confidence="recommended",
collective_available=True,
record_in_observatory=False,
reasons=reasons,
onchain_reason=_onchain_reason(inp),
)
# ── R5/R6: medium or large group → collective ────────────────────────────
modalities = _collect_modalities(inp, config)
if count <= config.collective_wot_min:
reasons.append(
f"{count} personnes concernées : vote collectif recommandé."
)
reasons.append(f"{count} personnes concernées : vote collectif recommandé.")
confidence = "recommended"
else:
reasons.append(
f"{count} personnes concernées : vote collectif obligatoire "
"(formule WoT applicable)."
f"{count} personnes concernées : vote collectif recommandé "
"(formule WoT applicable à cette échelle)."
)
confidence = "required"
confidence = "recommended"
if "vote_wot" not in modalities:
modalities = ["vote_wot"] + modalities
@@ -178,10 +163,11 @@ def qualify(inp: QualificationInput, config: QualificationConfig) -> Qualificati
process="vote_collective",
recommended_modalities=modalities,
recommend_onchain=_onchain(inp, reasons),
onchain_reason=_onchain_reason(inp),
confidence=confidence,
collective_available=True,
record_in_observatory=False,
reasons=reasons,
onchain_reason=_onchain_reason(inp),
)
@@ -210,10 +196,7 @@ def _onchain_reason(inp: QualificationInput) -> str | None:
)
def _collect_modalities(
inp: QualificationInput,
config: QualificationConfig,
) -> list[str]:
def _collect_modalities(inp: QualificationInput, config: QualificationConfig) -> list[str]:
"""Combine default modalities with any LLM suggestions (stub for now)."""
llm_suggestions = []
if inp.context_description:

View File

@@ -14,6 +14,7 @@ from app.middleware.security_headers import SecurityHeadersMiddleware
from app.routers import auth, documents, decisions, votes, mandates, protocols, sanctuary, websocket
from app.routers import public
from app.routers import organizations
from app.routers import qualify
# ── Structured logging setup ───────────────────────────────────────────────
@@ -133,6 +134,7 @@ app.include_router(sanctuary.router, prefix="/api/v1/sanctuary", tags=["sanctuar
app.include_router(websocket.router, prefix="/api/v1/ws", tags=["websocket"])
app.include_router(public.router, prefix="/api/v1/public", tags=["public"])
app.include_router(organizations.router, prefix="/api/v1/organizations", tags=["organizations"])
app.include_router(qualify.router, prefix="/api/v1/qualify", tags=["qualify"])
# ── Health check ─────────────────────────────────────────────────────────

View File

@@ -5,6 +5,7 @@ from app.models.decision import Decision, DecisionStep
from app.models.vote import VoteSession, Vote
from app.models.mandate import Mandate, MandateStep
from app.models.protocol import VotingProtocol, FormulaConfig
from app.models.qualification import QualificationProtocol
from app.models.sanctuary import SanctuaryEntry
from app.models.cache import BlockchainCache
@@ -16,6 +17,7 @@ __all__ = [
"VoteSession", "Vote",
"Mandate", "MandateStep",
"VotingProtocol", "FormulaConfig",
"QualificationProtocol",
"SanctuaryEntry",
"BlockchainCache",
]

View File

@@ -0,0 +1,38 @@
import json
import uuid
from datetime import datetime
from sqlalchemy import Boolean, DateTime, Integer, String, Text, Uuid, func
from sqlalchemy.orm import Mapped, mapped_column
from app.database import Base
class QualificationProtocol(Base):
"""Active configuration for the decision qualification engine.
Thresholds stored here override the engine defaults and can be updated
through the admin interface (meta-governance).
Only one record should be active at a time (is_active=True).
"""
__tablename__ = "qualification_protocols"
id: Mapped[uuid.UUID] = mapped_column(Uuid, primary_key=True, default=uuid.uuid4)
name: Mapped[str] = mapped_column(String(128), nullable=False)
description: Mapped[str | None] = mapped_column(Text)
small_group_max: Mapped[int] = mapped_column(Integer, default=5)
collective_wot_min: Mapped[int] = mapped_column(Integer, default=50)
# JSON array of modality slugs, e.g. '["vote_wot","vote_smith","election"]'
default_modalities_json: Mapped[str] = mapped_column(
Text,
default='["vote_wot","vote_smith","consultation_avis","election"]',
)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
@property
def default_modalities(self) -> list[str]:
return json.loads(self.default_modalities_json)

View File

@@ -0,0 +1,110 @@
"""Qualify router: decision qualification engine endpoint."""
from __future__ import annotations
from dataclasses import asdict
from fastapi import APIRouter, Depends
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.engine.qualifier import QualificationConfig, QualificationInput, qualify
from app.models.qualification import QualificationProtocol
from app.schemas.qualification import (
QualificationProtocolCreate,
QualificationProtocolOut,
QualifyRequest,
QualifyResponse,
)
from app.services.auth_service import get_current_identity
router = APIRouter()
async def _load_config(db: AsyncSession) -> QualificationConfig:
"""Load the active QualificationProtocol from DB, or fall back to defaults."""
result = await db.execute(
select(QualificationProtocol)
.where(QualificationProtocol.is_active == True) # noqa: E712
.order_by(QualificationProtocol.created_at.desc())
.limit(1)
)
proto = result.scalar_one_or_none()
if proto is None:
return QualificationConfig()
return QualificationConfig(
small_group_max=proto.small_group_max,
collective_wot_min=proto.collective_wot_min,
default_modalities=proto.default_modalities,
)
@router.post("/", response_model=QualifyResponse)
async def qualify_decision(
payload: QualifyRequest,
db: AsyncSession = Depends(get_db),
) -> QualifyResponse:
"""Qualify a decision: determine type, process, and modalities.
No authentication required — this is an advisory endpoint that helps
users understand which decision pathway fits their situation.
"""
config = await _load_config(db)
inp = QualificationInput(
within_mandate=payload.within_mandate,
affected_count=payload.affected_count,
is_structural=payload.is_structural,
context_description=payload.context_description,
)
result = qualify(inp, config)
return QualifyResponse(**asdict(result))
@router.get("/protocol", response_model=QualificationProtocolOut | None)
async def get_active_protocol(
db: AsyncSession = Depends(get_db),
) -> QualificationProtocolOut | None:
"""Return the currently active qualification protocol (thresholds)."""
result = await db.execute(
select(QualificationProtocol)
.where(QualificationProtocol.is_active == True) # noqa: E712
.order_by(QualificationProtocol.created_at.desc())
.limit(1)
)
proto = result.scalar_one_or_none()
if proto is None:
return None
return QualificationProtocolOut.model_validate(proto)
@router.post("/protocol", response_model=QualificationProtocolOut, status_code=201)
async def create_protocol(
payload: QualificationProtocolCreate,
db: AsyncSession = Depends(get_db),
_identity=Depends(get_current_identity),
) -> QualificationProtocolOut:
"""Create a new qualification protocol (requires auth).
Deactivates the current active protocol before saving the new one.
"""
# Deactivate current
current = await db.execute(
select(QualificationProtocol).where(QualificationProtocol.is_active == True) # noqa: E712
)
for proto in current.scalars().all():
proto.is_active = False
import json
proto = QualificationProtocol(
name=payload.name,
description=payload.description,
small_group_max=payload.small_group_max,
collective_wot_min=payload.collective_wot_min,
default_modalities_json=json.dumps(payload.default_modalities),
is_active=True,
)
db.add(proto)
await db.commit()
await db.refresh(proto)
return QualificationProtocolOut.model_validate(proto)

View File

@@ -0,0 +1,48 @@
from __future__ import annotations
from datetime import datetime
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
class QualifyRequest(BaseModel):
within_mandate: bool = False
affected_count: int | None = Field(default=None, ge=2, description="Nombre de personnes concernées (minimum 2)")
is_structural: bool = False
context_description: str | None = Field(default=None, max_length=2000)
class QualifyResponse(BaseModel):
decision_type: str
process: str
recommended_modalities: list[str]
recommend_onchain: bool
onchain_reason: str | None
confidence: str
collective_available: bool
record_in_observatory: bool
reasons: list[str]
class QualificationProtocolOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: UUID
name: str
description: str | None = None
small_group_max: int
collective_wot_min: int
default_modalities: list[str]
is_active: bool
created_at: datetime
class QualificationProtocolCreate(BaseModel):
name: str = Field(..., min_length=1, max_length=128)
description: str | None = None
small_group_max: int = Field(default=5, ge=1)
collective_wot_min: int = Field(default=50, ge=1)
default_modalities: list[str] = Field(
default=["vote_wot", "vote_smith", "consultation_avis", "election"]
)

View File

@@ -1,21 +1,21 @@
"""TDD — Moteur de qualification des décisions.
Ce fichier est la source de vérité exécutable des règles métier du tunnel "Décider".
Chaque test est nommé d'après la règle qu'il encode.
Source de vérité exécutable des règles métier du tunnel "Décider".
Règles implémentées dans ce fichier (RED → GREEN au fur et à mesure) :
R1 within_mandate → individual + consultation_avis
R2 within_mandate → aucune modalité de vote n'est proposée
R3 affected_count == 1 (hors mandat) → individual
R4 affected_count ≤ small_group_max → individual recommandé / collective possible
R5 small_group_max < affected_count collective_wot_min → collective recommandé
R6 affected_count > collective_wot_min → collective WoT obligatoire
R7 is_structural=True → recommend_onchain=True + raison explicite
R8 is_structural=False → recommend_onchain=False (on ne pollue pas avec l'option)
R9 decision_type est toujours dans l'enum autorisé
R10 individual n'expose jamais de modalités de vote
R11 collective expose au moins une modalité
R12 les seuils sont lus depuis QualificationConfig (configurables)
Règles testées :
R1 within_mandate → individual + consultation_avis
R2 within_mandate → aucune modalité de vote + consignation Observatoire
R4 2 ≤ affected_count ≤ small_group_max → individual recommandé, collectif disponible
R5 small_group_max < affected_count ≤ collective_wot_min → collective recommandé
R6 affected_count > collective_wot_min → collective recommandé (WoT applicable, non obligatoire)
R7 is_structural → recommend_onchain + raison explicite
R8 is_structural=False → recommend_onchain=False
GARDE-FOUS (invariants internes qui ne doivent jamais régresser) :
G1 decision_type est toujours dans l'enum autorisé
G2 individual n'expose jamais de modalités de vote
G3 collective expose au moins une modalité
G4 les seuils sont lus depuis QualificationConfig (configurables)
"""
from __future__ import annotations
@@ -30,11 +30,7 @@ from app.engine.qualifier import (
qualify,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
DEFAULT_CONFIG = QualificationConfig() # valeurs par défaut
DEFAULT_CONFIG = QualificationConfig()
# ---------------------------------------------------------------------------
@@ -43,19 +39,17 @@ DEFAULT_CONFIG = QualificationConfig() # valeurs par défaut
def test_r1_within_mandate_gives_individual():
"""Une décision dans le périmètre d'un mandat est toujours individuelle."""
result = qualify(QualificationInput(within_mandate=True), DEFAULT_CONFIG)
assert result.decision_type == DecisionType.INDIVIDUAL
def test_r1_within_mandate_gives_consultation_avis():
"""La décision dans un mandat utilise le processus 'consultation_avis'."""
result = qualify(QualificationInput(within_mandate=True), DEFAULT_CONFIG)
assert result.process == "consultation_avis"
def test_r1_within_mandate_overrides_large_affected_count():
"""Même si de nombreuses personnes sont concernées, un mandat impose 'individual'."""
"""Même si de nombreuses personnes sont concernées, un mandat impose individual."""
result = qualify(
QualificationInput(within_mandate=True, affected_count=500),
DEFAULT_CONFIG,
@@ -65,35 +59,29 @@ def test_r1_within_mandate_overrides_large_affected_count():
# ---------------------------------------------------------------------------
# R2 — within_mandate → aucune modalité de vote
# R2 — within_mandate → aucune modalité de vote + consignation Observatoire
# ---------------------------------------------------------------------------
def test_r2_within_mandate_no_modalities():
def test_r2_within_mandate_no_vote_modalities():
"""Le mandataire décide seul après consultation — pas de vote collectif."""
result = qualify(QualificationInput(within_mandate=True), DEFAULT_CONFIG)
assert result.recommended_modalities == []
# ---------------------------------------------------------------------------
# R3 — affected_count == 1 hors mandat → individual
# ---------------------------------------------------------------------------
def test_r2_within_mandate_records_in_observatory():
"""Une décision dans un mandat doit être consignée dans l'Observatoire."""
result = qualify(QualificationInput(within_mandate=True), DEFAULT_CONFIG)
assert result.record_in_observatory is True
def test_r3_single_person_is_individual():
def test_r2_out_of_mandate_does_not_force_observatory():
"""Hors mandat, la consignation dans l'Observatoire n'est pas imposée."""
result = qualify(
QualificationInput(within_mandate=False, affected_count=1),
QualificationInput(within_mandate=False, affected_count=10),
DEFAULT_CONFIG,
)
assert result.decision_type == DecisionType.INDIVIDUAL
def test_r3_single_person_process_is_personal():
result = qualify(
QualificationInput(within_mandate=False, affected_count=1),
DEFAULT_CONFIG,
)
assert result.process == "personal"
assert result.record_in_observatory is False
# ---------------------------------------------------------------------------
@@ -102,7 +90,6 @@ def test_r3_single_person_process_is_personal():
def test_r4_small_group_recommends_individual():
"""2 à small_group_max personnes : individual recommandé, collective possible."""
for count in range(2, DEFAULT_CONFIG.small_group_max + 1):
result = qualify(
QualificationInput(within_mandate=False, affected_count=count),
@@ -111,9 +98,14 @@ def test_r4_small_group_recommends_individual():
assert result.decision_type == DecisionType.INDIVIDUAL, (
f"affected_count={count} devrait recommander individual"
)
assert result.collective_available is True, (
f"affected_count={count} : collective doit rester disponible"
)
def test_r4_small_group_collective_is_available():
result = qualify(
QualificationInput(within_mandate=False, affected_count=3),
DEFAULT_CONFIG,
)
assert result.collective_available is True
def test_r4_small_group_confidence_is_recommended():
@@ -148,11 +140,11 @@ def test_r5_medium_group_confidence_is_recommended():
# ---------------------------------------------------------------------------
# R6 — affected_count > collective_wot_min → collective WoT requis
# R6 — affected_count > collective_wot_min → collective recommandé (pas obligatoire)
# ---------------------------------------------------------------------------
def test_r6_large_group_requires_collective():
def test_r6_large_group_recommends_collective():
result = qualify(
QualificationInput(within_mandate=False, affected_count=DEFAULT_CONFIG.collective_wot_min + 1),
DEFAULT_CONFIG,
@@ -160,12 +152,13 @@ def test_r6_large_group_requires_collective():
assert result.decision_type == DecisionType.COLLECTIVE
def test_r6_large_group_confidence_is_required():
def test_r6_large_group_confidence_is_recommended_not_required():
"""Au-delà du seuil WoT, le vote collectif est recommandé — pas imposé."""
result = qualify(
QualificationInput(within_mandate=False, affected_count=DEFAULT_CONFIG.collective_wot_min + 1),
DEFAULT_CONFIG,
)
assert result.confidence == "required"
assert result.confidence == "recommended"
def test_r6_large_group_includes_vote_wot_modality():
@@ -182,7 +175,6 @@ def test_r6_large_group_includes_vote_wot_modality():
def test_r7_structural_recommends_onchain():
"""Décision structurante (force de loi ou action machine) → on-chain proposé."""
result = qualify(
QualificationInput(within_mandate=False, affected_count=100, is_structural=True),
DEFAULT_CONFIG,
@@ -195,14 +187,13 @@ def test_r7_structural_provides_onchain_reason():
QualificationInput(within_mandate=False, affected_count=100, is_structural=True),
DEFAULT_CONFIG,
)
assert result.onchain_reason is not None
assert len(result.onchain_reason) > 0
assert result.onchain_reason is not None and len(result.onchain_reason) > 0
def test_r7_structural_individual_can_also_recommend_onchain():
"""Même une décision individuelle structurante peut être gravée on-chain."""
def test_r7_structural_within_mandate_can_also_recommend_onchain():
"""Même une décision dans un mandat peut être gravée si structurante."""
result = qualify(
QualificationInput(within_mandate=False, affected_count=1, is_structural=True),
QualificationInput(within_mandate=True, is_structural=True),
DEFAULT_CONFIG,
)
assert result.recommend_onchain is True
@@ -214,48 +205,37 @@ def test_r7_structural_individual_can_also_recommend_onchain():
def test_r8_non_structural_never_recommends_onchain():
"""On ne pollue pas les décisions ordinaires avec l'option on-chain."""
for count in [1, 3, 10, 100]:
for count in [2, 3, 10, 100]:
result = qualify(
QualificationInput(within_mandate=False, affected_count=count, is_structural=False),
DEFAULT_CONFIG,
)
assert result.recommend_onchain is False, (
f"affected_count={count}, is_structural=False : on-chain ne doit pas être proposé"
f"affected_count={count} non structurant : on-chain ne doit pas être proposé"
)
# ---------------------------------------------------------------------------
# R9 — decision_type toujours dans l'enum autorisé
# GARDE-FOUS internes (régressions silencieuses)
# ---------------------------------------------------------------------------
def test_r9_decision_type_always_valid():
def test_g1_decision_type_always_valid():
valid_types = set(DecisionType)
inputs = [
QualificationInput(within_mandate=True),
QualificationInput(within_mandate=False, affected_count=1),
QualificationInput(within_mandate=False, affected_count=3),
QualificationInput(within_mandate=False, affected_count=10),
QualificationInput(within_mandate=False, affected_count=100),
QualificationInput(within_mandate=False, affected_count=100, is_structural=True),
]
for inp in inputs:
result = qualify(inp, DEFAULT_CONFIG)
assert result.decision_type in valid_types, (
f"Type invalide '{result.decision_type}' pour input {inp}"
)
# ---------------------------------------------------------------------------
# R10 — individual n'expose jamais de modalités
# ---------------------------------------------------------------------------
def test_r10_individual_no_modalities():
for inp in [
QualificationInput(within_mandate=True),
QualificationInput(within_mandate=False, affected_count=1),
QualificationInput(within_mandate=False, affected_count=2),
QualificationInput(within_mandate=False, affected_count=10),
QualificationInput(within_mandate=False, affected_count=100),
]:
result = qualify(inp, DEFAULT_CONFIG)
assert result.decision_type in valid_types
def test_g2_individual_never_has_vote_modalities():
for inp in [
QualificationInput(within_mandate=True),
QualificationInput(within_mandate=False, affected_count=2),
QualificationInput(within_mandate=False, affected_count=3),
]:
result = qualify(inp, DEFAULT_CONFIG)
@@ -265,12 +245,7 @@ def test_r10_individual_no_modalities():
)
# ---------------------------------------------------------------------------
# R11 — collective expose au moins une modalité
# ---------------------------------------------------------------------------
def test_r11_collective_has_at_least_one_modality():
def test_g3_collective_has_at_least_one_modality():
result = qualify(
QualificationInput(within_mandate=False, affected_count=20),
DEFAULT_CONFIG,
@@ -279,19 +254,9 @@ def test_r11_collective_has_at_least_one_modality():
assert len(result.recommended_modalities) >= 1
# ---------------------------------------------------------------------------
# R12 — les seuils sont lus depuis QualificationConfig
# ---------------------------------------------------------------------------
def test_r12_custom_config_overrides_thresholds():
"""Les seuils viennent de QualificationConfig, pas de constantes hardcodées."""
custom = QualificationConfig(
individual_max=1,
small_group_max=2, # au lieu de 5
collective_wot_min=10,
)
# affected_count=3 est au-delà de small_group_max=2 → collective
def test_g4_custom_config_overrides_thresholds():
"""Les seuils viennent de QualificationConfig — pas de constantes hardcodées."""
custom = QualificationConfig(small_group_max=2, collective_wot_min=10)
result = qualify(
QualificationInput(within_mandate=False, affected_count=3),
custom,
@@ -299,9 +264,7 @@ def test_r12_custom_config_overrides_thresholds():
assert result.decision_type == DecisionType.COLLECTIVE
def test_r12_default_config_thresholds_are_expected():
"""Valeurs par défaut documentées et vérifiables."""
def test_g4_default_thresholds_are_stable():
cfg = QualificationConfig()
assert cfg.individual_max == 1
assert cfg.small_group_max == 5
assert cfg.collective_wot_min == 50

View File

@@ -33,6 +33,7 @@ from app.models.document import Document, DocumentItem
from app.models.decision import Decision, DecisionStep
from app.models.mandate import Mandate, MandateStep
from app.models.organization import Organization
from app.models.qualification import QualificationProtocol
from app.models.user import DuniterIdentity
from app.models.vote import VoteSession, Vote
@@ -2459,6 +2460,37 @@ async def seed_organizations(session: AsyncSession) -> dict[str, Organization]:
# Main seed runner
# ---------------------------------------------------------------------------
async def seed_qualification_protocol(session: AsyncSession) -> QualificationProtocol:
"""Seed the default qualification protocol (thresholds for the Décider tunnel)."""
stmt = select(QualificationProtocol).where(QualificationProtocol.is_active == True) # noqa: E712
result = await session.execute(stmt)
existing = result.scalar_one_or_none()
if existing is not None:
print(" [skip] Protocole de qualification déjà présent")
return existing
proto = QualificationProtocol(
name="Protocole de qualification par défaut",
description=(
"Seuils utilisés par le tunnel Décider pour router vers "
"individual/collective et proposer les modalités de vote."
),
small_group_max=5,
collective_wot_min=50,
default_modalities_json=json.dumps([
"vote_wot",
"vote_smith",
"consultation_avis",
"election",
]),
is_active=True,
)
session.add(proto)
await session.flush()
print(" [ok] Protocole de qualification créé")
return proto
async def run_seed():
print("=" * 60)
print("libreDecision - Seed Database")
@@ -2474,6 +2506,9 @@ async def run_seed():
orgs = await seed_organizations(session)
duniter_g1_id = orgs["duniter-g1"].id
print("\n[0b] Protocole de qualification...")
await seed_qualification_protocol(session)
print("\n[1/10] Formula Configs...")
formulas = await seed_formula_configs(session)