Sprint 1 : scaffolding complet de Glibredecision

Plateforme de decisions collectives pour Duniter/G1.
Backend FastAPI async + PostgreSQL (14 tables, 8 routers, 6 services,
moteur de vote avec formule d'inertie WoT/Smith/TechComm).
Frontend Nuxt 4 + Nuxt UI v3 + Pinia (9 pages, 5 stores).
Infrastructure Docker + Woodpecker CI + Traefik.
Documentation technique et utilisateur (15 fichiers).
Seed : Licence G1, Engagement Forgeron v2.0.0, 4 protocoles de vote.
30 tests unitaires (formules, mode params, vote nuance) -- tous verts.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Yvv
2026-02-28 12:46:11 +01:00
commit 25437f24e3
100 changed files with 10236 additions and 0 deletions

0
backend/app/__init__.py Normal file
View File

33
backend/app/config.py Normal file
View File

@@ -0,0 +1,33 @@
from pydantic_settings import BaseSettings
from pathlib import Path
class Settings(BaseSettings):
APP_NAME: str = "Glibredecision"
DEBUG: bool = True
# Database
DATABASE_URL: str = "postgresql+asyncpg://glibredecision:change-me-in-production@localhost:5432/glibredecision"
# Auth
SECRET_KEY: str = "change-me-in-production-with-a-real-secret-key"
CHALLENGE_EXPIRE_SECONDS: int = 300
TOKEN_EXPIRE_HOURS: int = 24
# Duniter V2 RPC
DUNITER_RPC_URL: str = "wss://gdev.p2p.legal/ws"
# IPFS
IPFS_API_URL: str = "http://localhost:5001"
IPFS_GATEWAY_URL: str = "http://localhost:8080"
# CORS
CORS_ORIGINS: list[str] = ["http://localhost:3002"]
# Paths
BASE_DIR: Path = Path(__file__).resolve().parent.parent
model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}
settings = Settings()

21
backend/app/database.py Normal file
View File

@@ -0,0 +1,21 @@
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
from app.config import settings
engine = create_async_engine(settings.DATABASE_URL, echo=settings.DEBUG)
async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
class Base(DeclarativeBase):
pass
async def get_db():
async with async_session() as session:
yield session
async def init_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)

View File

View File

@@ -0,0 +1,107 @@
"""Parse mode-parameter strings into structured dicts.
A mode-params string encodes voting formula parameters in a compact format.
Example: ``"D30M50B.1G.2T.1"``
Supported codes:
D = duration_days (int)
M = majority_pct (int, 0-100)
B = base_exponent (float)
G = gradient_exponent (float)
C = constant_base (float)
S = smith_exponent (float)
T = techcomm_exponent (float)
N = ratio_multiplier (float)
R = ratio_mode (bool, 0 or 1)
Values may start with a dot for decimals < 1, e.g. ``B.1`` means base_exponent=0.1.
"""
from __future__ import annotations
import re
# Ordered list of recognised codes and their target keys + types
_CODES: dict[str, tuple[str, type]] = {
"D": ("duration_days", int),
"M": ("majority_pct", int),
"B": ("base_exponent", float),
"G": ("gradient_exponent", float),
"C": ("constant_base", float),
"S": ("smith_exponent", float),
"T": ("techcomm_exponent", float),
"N": ("ratio_multiplier", float),
"R": ("is_ratio_mode", bool),
}
# Regex: a single uppercase letter followed by a numeric value (int or float,
# possibly starting with '.' for values like .1 meaning 0.1)
_PARAM_RE = re.compile(r"([A-Z])(\d*\.?\d+)")
def parse_mode_params(params_str: str) -> dict:
"""Parse a mode-params string into a parameter dict.
Parameters
----------
params_str:
Compact parameter string, e.g. ``"D30M50B.1G.2T.1"``.
Returns
-------
dict
Keys present depend on codes found in the string. Defaults are
applied for any code not present::
{
"duration_days": 30,
"majority_pct": 50,
"base_exponent": 0.1,
"gradient_exponent": 0.2,
"constant_base": 0.0,
"smith_exponent": None,
"techcomm_exponent": None,
"ratio_multiplier": None,
"is_ratio_mode": False,
}
Raises
------
ValueError
If an unrecognised code letter is found.
"""
defaults: dict = {
"duration_days": 30,
"majority_pct": 50,
"base_exponent": 0.1,
"gradient_exponent": 0.2,
"constant_base": 0.0,
"smith_exponent": None,
"techcomm_exponent": None,
"ratio_multiplier": None,
"is_ratio_mode": False,
}
if not params_str or not params_str.strip():
return dict(defaults)
result = dict(defaults)
for match in _PARAM_RE.finditer(params_str):
code = match.group(1)
raw_value = match.group(2)
if code not in _CODES:
raise ValueError(f"Code de parametre inconnu : '{code}'")
key, target_type = _CODES[code]
if target_type is int:
result[key] = int(float(raw_value))
elif target_type is float:
result[key] = float(raw_value)
elif target_type is bool:
result[key] = float(raw_value) != 0.0
return result

View File

@@ -0,0 +1,95 @@
"""Six-level nuanced vote evaluation.
Levels:
0 - CONTRE
1 - PAS DU TOUT
2 - PAS D'ACCORD
3 - NEUTRE
4 - D'ACCORD
5 - TOUT A FAIT
Adoption rule:
The sum of votes at levels 3 + 4 + 5 must be >= threshold_pct% of total votes.
A minimum number of participants is also required.
"""
from __future__ import annotations
LEVEL_LABELS: dict[int, str] = {
0: "CONTRE",
1: "PAS DU TOUT",
2: "PAS D'ACCORD",
3: "NEUTRE",
4: "D'ACCORD",
5: "TOUT A FAIT",
}
NUM_LEVELS = 6
def evaluate_nuanced(
votes: list[int],
threshold_pct: int = 80,
min_participants: int = 59,
) -> dict:
"""Evaluate a nuanced vote from a list of individual vote levels.
Parameters
----------
votes:
List of vote levels (each 0-5). One entry per voter.
threshold_pct:
Minimum percentage of positive votes (levels 3-5) out of total
for adoption.
min_participants:
Minimum number of participants required for validity.
Returns
-------
dict
{
"total": int,
"per_level_counts": {0: int, 1: int, ..., 5: int},
"positive_count": int, # levels 3 + 4 + 5
"positive_pct": float, # 0.0 - 100.0
"threshold_met": bool,
"min_participants_met": bool,
"adopted": bool,
}
Raises
------
ValueError
If any vote value is outside the 0-5 range.
"""
# Validate vote levels
for v in votes:
if v < 0 or v > 5:
raise ValueError(
f"Niveau de vote invalide : {v}. Les niveaux valides sont 0-5."
)
total = len(votes)
per_level_counts: dict[int, int] = {level: 0 for level in range(NUM_LEVELS)}
for v in votes:
per_level_counts[v] += 1
# Positive = levels 3 (NEUTRE), 4 (D'ACCORD), 5 (TOUT A FAIT)
positive_count = per_level_counts[3] + per_level_counts[4] + per_level_counts[5]
positive_pct = (positive_count / total * 100.0) if total > 0 else 0.0
threshold_met = positive_pct >= threshold_pct
min_participants_met = total >= min_participants
adopted = threshold_met and min_participants_met
return {
"total": total,
"per_level_counts": per_level_counts,
"positive_count": positive_count,
"positive_pct": round(positive_pct, 2),
"threshold_met": threshold_met,
"min_participants_met": min_participants_met,
"adopted": adopted,
}

View File

@@ -0,0 +1,31 @@
"""Smith sub-WoT threshold criterion.
The Smith criterion requires a minimum number of votes from Smith members
(forgerons) for certain decisions to be valid.
Formula: ceil(SmithWotSize ^ S)
"""
from __future__ import annotations
import math
def smith_threshold(smith_wot_size: int, exponent: float = 0.1) -> int:
"""Compute the minimum number of Smith member votes required.
Parameters
----------
smith_wot_size:
Number of active Smith members.
exponent:
S in the formula ``ceil(smith_wot_size^S)``.
Returns
-------
int
Minimum Smith votes required.
"""
if smith_wot_size <= 0:
raise ValueError("smith_wot_size doit etre strictement positif")
return math.ceil(smith_wot_size ** exponent)

View File

@@ -0,0 +1,31 @@
"""Technical Committee threshold criterion.
The TechComm criterion requires a minimum number of votes from
Technical Committee members for certain decisions.
Formula: ceil(CoTecSize ^ T)
"""
from __future__ import annotations
import math
def techcomm_threshold(cotec_size: int, exponent: float = 0.1) -> int:
"""Compute the minimum number of TechComm member votes required.
Parameters
----------
cotec_size:
Number of Technical Committee members.
exponent:
T in the formula ``ceil(cotec_size^T)``.
Returns
-------
int
Minimum TechComm votes required.
"""
if cotec_size <= 0:
raise ValueError("cotec_size doit etre strictement positif")
return math.ceil(cotec_size ** exponent)

View File

@@ -0,0 +1,85 @@
"""WoT members threshold formula for binary votes.
Core formula:
Result = C + B^W + (M + (1-M) * (1 - (T/W)^G)) * max(0, T - C)
Where:
C = constant_base
B = base_exponent
W = wot_size (corpus of eligible voters)
T = total_votes (for + against)
M = majority_ratio (majority_pct / 100)
G = gradient_exponent
Inertia behaviour:
- Low participation (T << W) -> near-unanimity required
- High participation (T -> W) -> simple majority M suffices
Reference test case:
wot_size=7224, votes_for=97, votes_against=23 (total=120)
params M50 B.1 G.2 => threshold=94, adopted (97 >= 94)
"""
from __future__ import annotations
import math
def wot_threshold(
wot_size: int,
total_votes: int,
majority_pct: int = 50,
base_exponent: float = 0.1,
gradient_exponent: float = 0.2,
constant_base: float = 0.0,
) -> int:
"""Compute the minimum number of *for* votes required for adoption.
Parameters
----------
wot_size:
Size of the eligible voter corpus (WoT members).
total_votes:
Number of votes cast (for + against).
majority_pct:
Majority percentage (0-100). 50 = simple majority at full participation.
base_exponent:
B in the formula. ``B^W`` contributes a vanishingly small offset
when W is large (0 < B < 1).
gradient_exponent:
G controls how fast the required super-majority decays toward M as
participation increases.
constant_base:
C, a fixed additive floor on the threshold.
Returns
-------
int
The ceiling of the computed threshold. A vote passes when
``votes_for >= wot_threshold(...)``.
"""
if wot_size <= 0:
raise ValueError("wot_size doit etre strictement positif")
if total_votes < 0:
raise ValueError("total_votes ne peut pas etre negatif")
if not (0 <= majority_pct <= 100):
raise ValueError("majority_pct doit etre entre 0 et 100")
C = constant_base
B = base_exponent
W = wot_size
T = total_votes
M = majority_pct / 100.0
G = gradient_exponent
# Guard: if no votes, threshold is at least ceil(C + B^W)
if T == 0:
return math.ceil(C + B ** W)
# Core formula
participation_ratio = T / W
inertia_factor = 1.0 - participation_ratio ** G
required_ratio = M + (1.0 - M) * inertia_factor
result = C + B ** W + required_ratio * max(0.0, T - C)
return math.ceil(result)

44
backend/app/main.py Normal file
View File

@@ -0,0 +1,44 @@
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.config import settings
from app.database import init_db
from app.routers import auth, documents, decisions, votes, mandates, protocols, sanctuary, websocket
@asynccontextmanager
async def lifespan(app: FastAPI):
await init_db()
yield
app = FastAPI(
title=settings.APP_NAME,
description="Plateforme de decisions collectives pour la communaute Duniter/G1",
version="0.1.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.CORS_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(auth.router, prefix="/api/v1/auth", tags=["auth"])
app.include_router(documents.router, prefix="/api/v1/documents", tags=["documents"])
app.include_router(decisions.router, prefix="/api/v1/decisions", tags=["decisions"])
app.include_router(votes.router, prefix="/api/v1/votes", tags=["votes"])
app.include_router(mandates.router, prefix="/api/v1/mandates", tags=["mandates"])
app.include_router(protocols.router, prefix="/api/v1/protocols", tags=["protocols"])
app.include_router(sanctuary.router, prefix="/api/v1/sanctuary", tags=["sanctuary"])
app.include_router(websocket.router, prefix="/api/v1/ws", tags=["websocket"])
@app.get("/api/health")
async def health():
return {"status": "ok"}

View File

@@ -0,0 +1,19 @@
from app.models.user import DuniterIdentity, Session
from app.models.document import Document, DocumentItem, ItemVersion
from app.models.decision import Decision, DecisionStep
from app.models.vote import VoteSession, Vote
from app.models.mandate import Mandate, MandateStep
from app.models.protocol import VotingProtocol, FormulaConfig
from app.models.sanctuary import SanctuaryEntry
from app.models.cache import BlockchainCache
__all__ = [
"DuniterIdentity", "Session",
"Document", "DocumentItem", "ItemVersion",
"Decision", "DecisionStep",
"VoteSession", "Vote",
"Mandate", "MandateStep",
"VotingProtocol", "FormulaConfig",
"SanctuaryEntry",
"BlockchainCache",
]

View File

@@ -0,0 +1,18 @@
import uuid
from datetime import datetime
from sqlalchemy import String, DateTime, func
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.orm import Mapped, mapped_column
from app.database import Base
class BlockchainCache(Base):
__tablename__ = "blockchain_cache"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
cache_key: Mapped[str] = mapped_column(String(256), unique=True, nullable=False, index=True)
cache_value: Mapped[dict] = mapped_column(JSONB, nullable=False)
fetched_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
expires_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)

View File

@@ -0,0 +1,42 @@
import uuid
from datetime import datetime
from sqlalchemy import String, Integer, Text, DateTime, ForeignKey, func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
class Decision(Base):
__tablename__ = "decisions"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
title: Mapped[str] = mapped_column(String(256), nullable=False)
description: Mapped[str | None] = mapped_column(Text)
context: Mapped[str | None] = mapped_column(Text)
decision_type: Mapped[str] = mapped_column(String(64), nullable=False) # runtime_upgrade, document_change, mandate_vote, custom
status: Mapped[str] = mapped_column(String(32), default="draft") # draft, qualification, review, voting, executed, closed
voting_protocol_id: Mapped[uuid.UUID | None] = mapped_column(ForeignKey("voting_protocols.id"))
created_by_id: Mapped[uuid.UUID | None] = mapped_column(ForeignKey("duniter_identities.id"))
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
steps: Mapped[list["DecisionStep"]] = relationship(back_populates="decision", cascade="all, delete-orphan", order_by="DecisionStep.step_order")
class DecisionStep(Base):
__tablename__ = "decision_steps"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
decision_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("decisions.id"), nullable=False)
step_order: Mapped[int] = mapped_column(Integer, nullable=False)
step_type: Mapped[str] = mapped_column(String(32), nullable=False) # qualification, review, vote, execution, reporting
title: Mapped[str | None] = mapped_column(String(256))
description: Mapped[str | None] = mapped_column(Text)
status: Mapped[str] = mapped_column(String(32), default="pending") # pending, active, completed, skipped
vote_session_id: Mapped[uuid.UUID | None] = mapped_column(ForeignKey("vote_sessions.id"))
outcome: Mapped[str | None] = mapped_column(Text)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
decision: Mapped["Decision"] = relationship(back_populates="steps")

View File

@@ -0,0 +1,60 @@
import uuid
from datetime import datetime
from sqlalchemy import String, Integer, Text, DateTime, ForeignKey, func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
class Document(Base):
__tablename__ = "documents"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
slug: Mapped[str] = mapped_column(String(128), unique=True, nullable=False, index=True)
title: Mapped[str] = mapped_column(String(256), nullable=False)
doc_type: Mapped[str] = mapped_column(String(64), nullable=False) # licence, engagement, reglement, constitution
version: Mapped[str] = mapped_column(String(32), default="0.1.0")
status: Mapped[str] = mapped_column(String(32), default="draft") # draft, active, archived
description: Mapped[str | None] = mapped_column(Text)
ipfs_cid: Mapped[str | None] = mapped_column(String(128))
chain_anchor: Mapped[str | None] = mapped_column(String(128))
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
items: Mapped[list["DocumentItem"]] = relationship(back_populates="document", cascade="all, delete-orphan", order_by="DocumentItem.position")
class DocumentItem(Base):
__tablename__ = "document_items"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
document_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("documents.id"), nullable=False)
position: Mapped[str] = mapped_column(String(16), nullable=False) # "1", "1.1", "3.2"
item_type: Mapped[str] = mapped_column(String(32), default="clause") # clause, rule, verification, preamble, section
title: Mapped[str | None] = mapped_column(String(256))
current_text: Mapped[str] = mapped_column(Text, nullable=False)
voting_protocol_id: Mapped[uuid.UUID | None] = mapped_column(ForeignKey("voting_protocols.id"))
sort_order: Mapped[int] = mapped_column(Integer, default=0)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
document: Mapped["Document"] = relationship(back_populates="items")
versions: Mapped[list["ItemVersion"]] = relationship(back_populates="item", cascade="all, delete-orphan")
class ItemVersion(Base):
__tablename__ = "item_versions"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
item_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("document_items.id"), nullable=False)
proposed_text: Mapped[str] = mapped_column(Text, nullable=False)
diff_text: Mapped[str | None] = mapped_column(Text)
rationale: Mapped[str | None] = mapped_column(Text)
status: Mapped[str] = mapped_column(String(32), default="proposed") # proposed, voting, accepted, rejected
decision_id: Mapped[uuid.UUID | None] = mapped_column(ForeignKey("decisions.id"))
proposed_by_id: Mapped[uuid.UUID | None] = mapped_column(ForeignKey("duniter_identities.id"))
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
item: Mapped["DocumentItem"] = relationship(back_populates="versions")

View File

@@ -0,0 +1,43 @@
import uuid
from datetime import datetime
from sqlalchemy import String, Integer, Text, DateTime, ForeignKey, func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
class Mandate(Base):
__tablename__ = "mandates"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
title: Mapped[str] = mapped_column(String(256), nullable=False)
description: Mapped[str | None] = mapped_column(Text)
mandate_type: Mapped[str] = mapped_column(String(64), nullable=False) # techcomm, smith, custom
status: Mapped[str] = mapped_column(String(32), default="draft") # draft, candidacy, voting, active, reporting, completed, revoked
mandatee_id: Mapped[uuid.UUID | None] = mapped_column(ForeignKey("duniter_identities.id"))
decision_id: Mapped[uuid.UUID | None] = mapped_column(ForeignKey("decisions.id"))
starts_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
ends_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
steps: Mapped[list["MandateStep"]] = relationship(back_populates="mandate", cascade="all, delete-orphan", order_by="MandateStep.step_order")
class MandateStep(Base):
__tablename__ = "mandate_steps"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
mandate_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("mandates.id"), nullable=False)
step_order: Mapped[int] = mapped_column(Integer, nullable=False)
step_type: Mapped[str] = mapped_column(String(32), nullable=False) # formulation, candidacy, vote, assignment, reporting, completion, revocation
title: Mapped[str | None] = mapped_column(String(256))
description: Mapped[str | None] = mapped_column(Text)
status: Mapped[str] = mapped_column(String(32), default="pending") # pending, active, completed, skipped
vote_session_id: Mapped[uuid.UUID | None] = mapped_column(ForeignKey("vote_sessions.id"))
outcome: Mapped[str | None] = mapped_column(Text)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
mandate: Mapped["Mandate"] = relationship(back_populates="steps")

View File

@@ -0,0 +1,52 @@
import uuid
from datetime import datetime
from sqlalchemy import String, Integer, Float, Boolean, DateTime, ForeignKey, Text, func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
class FormulaConfig(Base):
__tablename__ = "formula_configs"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name: Mapped[str] = mapped_column(String(128), nullable=False)
description: Mapped[str | None] = mapped_column(Text)
# WoT threshold params
duration_days: Mapped[int] = mapped_column(Integer, default=30)
majority_pct: Mapped[int] = mapped_column(Integer, default=50)
base_exponent: Mapped[float] = mapped_column(Float, default=0.1)
gradient_exponent: Mapped[float] = mapped_column(Float, default=0.2)
constant_base: Mapped[float] = mapped_column(Float, default=0.0)
# Smith criterion
smith_exponent: Mapped[float | None] = mapped_column(Float)
# TechComm criterion
techcomm_exponent: Mapped[float | None] = mapped_column(Float)
# Nuanced vote
nuanced_min_participants: Mapped[int | None] = mapped_column(Integer)
nuanced_threshold_pct: Mapped[int | None] = mapped_column(Integer)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
protocols: Mapped[list["VotingProtocol"]] = relationship(back_populates="formula_config")
class VotingProtocol(Base):
__tablename__ = "voting_protocols"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name: Mapped[str] = mapped_column(String(128), nullable=False)
description: Mapped[str | None] = mapped_column(Text)
vote_type: Mapped[str] = mapped_column(String(32), nullable=False) # binary, nuanced
formula_config_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("formula_configs.id"), nullable=False)
mode_params: Mapped[str | None] = mapped_column(String(64)) # e.g. "D30M50B.1G.2T.1"
is_meta_governed: Mapped[bool] = mapped_column(Boolean, default=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
formula_config: Mapped["FormulaConfig"] = relationship(back_populates="protocols")

View File

@@ -0,0 +1,23 @@
import uuid
from datetime import datetime
from sqlalchemy import String, Integer, Text, DateTime, func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column
from app.database import Base
class SanctuaryEntry(Base):
__tablename__ = "sanctuary_entries"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
entry_type: Mapped[str] = mapped_column(String(64), nullable=False) # document, decision, vote_result
reference_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), nullable=False)
title: Mapped[str | None] = mapped_column(String(256))
content_hash: Mapped[str] = mapped_column(String(128), nullable=False) # SHA-256
ipfs_cid: Mapped[str | None] = mapped_column(String(128))
chain_tx_hash: Mapped[str | None] = mapped_column(String(128))
chain_block: Mapped[int | None] = mapped_column(Integer)
metadata_json: Mapped[str | None] = mapped_column(Text) # JSON string for extra data
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())

View File

@@ -0,0 +1,35 @@
import uuid
from datetime import datetime
from sqlalchemy import String, Boolean, DateTime, ForeignKey, func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
class DuniterIdentity(Base):
__tablename__ = "duniter_identities"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
address: Mapped[str] = mapped_column(String(64), unique=True, nullable=False, index=True)
display_name: Mapped[str | None] = mapped_column(String(128))
wot_status: Mapped[str] = mapped_column(String(32), default="unknown") # member, pending, revoked, unknown
is_smith: Mapped[bool] = mapped_column(Boolean, default=False)
is_techcomm: Mapped[bool] = mapped_column(Boolean, default=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
sessions: Mapped[list["Session"]] = relationship(back_populates="identity", cascade="all, delete-orphan")
class Session(Base):
__tablename__ = "sessions"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
token_hash: Mapped[str] = mapped_column(String(128), unique=True, nullable=False, index=True)
identity_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("duniter_identities.id"), nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
expires_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
identity: Mapped["DuniterIdentity"] = relationship(back_populates="sessions")

View File

@@ -0,0 +1,71 @@
import uuid
from datetime import datetime
from sqlalchemy import String, Integer, Float, Boolean, Text, DateTime, ForeignKey, func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
class VoteSession(Base):
__tablename__ = "vote_sessions"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
decision_id: Mapped[uuid.UUID | None] = mapped_column(ForeignKey("decisions.id"))
item_version_id: Mapped[uuid.UUID | None] = mapped_column(ForeignKey("item_versions.id"))
voting_protocol_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("voting_protocols.id"), nullable=False)
# Snapshot at session start
wot_size: Mapped[int] = mapped_column(Integer, default=0)
smith_size: Mapped[int] = mapped_column(Integer, default=0)
techcomm_size: Mapped[int] = mapped_column(Integer, default=0)
# Dates
starts_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
ends_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)
# Status
status: Mapped[str] = mapped_column(String(32), default="open") # open, closed, tallied
# Tallies
votes_for: Mapped[int] = mapped_column(Integer, default=0)
votes_against: Mapped[int] = mapped_column(Integer, default=0)
votes_total: Mapped[int] = mapped_column(Integer, default=0)
smith_votes_for: Mapped[int] = mapped_column(Integer, default=0)
techcomm_votes_for: Mapped[int] = mapped_column(Integer, default=0)
threshold_required: Mapped[float] = mapped_column(Float, default=0.0)
result: Mapped[str | None] = mapped_column(String(32)) # adopted, rejected, null
# Chain recording
chain_recorded: Mapped[bool] = mapped_column(Boolean, default=False)
chain_tx_hash: Mapped[str | None] = mapped_column(String(128))
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
votes: Mapped[list["Vote"]] = relationship(back_populates="session", cascade="all, delete-orphan")
class Vote(Base):
__tablename__ = "votes"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
session_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("vote_sessions.id"), nullable=False)
voter_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("duniter_identities.id"), nullable=False)
vote_value: Mapped[str] = mapped_column(String(32), nullable=False) # for, against, or nuanced levels
nuanced_level: Mapped[int | None] = mapped_column(Integer) # 0-5 for nuanced votes
comment: Mapped[str | None] = mapped_column(Text)
# Cryptographic proof
signature: Mapped[str] = mapped_column(Text, nullable=False)
signed_payload: Mapped[str] = mapped_column(Text, nullable=False)
# Voter status snapshot
voter_wot_status: Mapped[str] = mapped_column(String(32), default="member")
voter_is_smith: Mapped[bool] = mapped_column(Boolean, default=False)
voter_is_techcomm: Mapped[bool] = mapped_column(Boolean, default=False)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
session: Mapped["VoteSession"] = relationship(back_populates="votes")

View File

162
backend/app/routers/auth.py Normal file
View File

@@ -0,0 +1,162 @@
"""Auth router: Ed25519 challenge-response authentication for Duniter V2 identities."""
from __future__ import annotations
import secrets
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database import get_db
from app.models.user import DuniterIdentity
from app.schemas.auth import (
ChallengeRequest,
ChallengeResponse,
IdentityOut,
TokenResponse,
VerifyRequest,
)
from app.services.auth_service import (
create_session,
get_current_identity,
get_or_create_identity,
invalidate_session,
)
router = APIRouter()
# ── In-memory challenge store (short-lived, no persistence needed) ──────────
# Structure: { address: { "challenge": str, "expires_at": datetime } }
_pending_challenges: dict[str, dict] = {}
def _cleanup_expired_challenges() -> None:
"""Remove expired challenges from the in-memory store."""
now = datetime.now(timezone.utc)
expired = [addr for addr, data in _pending_challenges.items() if data["expires_at"] < now]
for addr in expired:
del _pending_challenges[addr]
# ── Routes ──────────────────────────────────────────────────────────────────
@router.post("/challenge", response_model=ChallengeResponse)
async def request_challenge(payload: ChallengeRequest) -> ChallengeResponse:
"""Generate a random Ed25519 challenge for the given Duniter address.
The client must sign this challenge with the private key corresponding
to the address, then submit it via POST /verify.
"""
_cleanup_expired_challenges()
challenge = secrets.token_hex(32)
expires_at = datetime.now(timezone.utc) + timedelta(seconds=settings.CHALLENGE_EXPIRE_SECONDS)
_pending_challenges[payload.address] = {
"challenge": challenge,
"expires_at": expires_at,
}
return ChallengeResponse(challenge=challenge, expires_at=expires_at)
@router.post("/verify", response_model=TokenResponse)
async def verify_challenge(
payload: VerifyRequest,
db: AsyncSession = Depends(get_db),
) -> TokenResponse:
"""Verify the Ed25519 signature of a challenge and return a session token.
Steps:
1. Check that a pending challenge exists for the address.
2. Verify the challenge string matches.
3. Verify the Ed25519 signature against the address public key.
4. Create or retrieve the DuniterIdentity.
5. Create a session and return the bearer token.
"""
# 1. Retrieve pending challenge
pending = _pending_challenges.get(payload.address)
if pending is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Aucun challenge en attente pour cette adresse",
)
# 2. Check expiry
if pending["expires_at"] < datetime.now(timezone.utc):
del _pending_challenges[payload.address]
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Challenge expire, veuillez en demander un nouveau",
)
# 3. Verify challenge string matches
if pending["challenge"] != payload.challenge:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Challenge invalide",
)
# 4. Verify Ed25519 signature
# TODO: Implement actual Ed25519 verification using substrate-interface
# For now we accept any signature to allow development/testing.
# In production this MUST verify: verify(address_pubkey, challenge_bytes, signature_bytes)
#
# from substrateinterface import Keypair
# keypair = Keypair(ss58_address=payload.address)
# if not keypair.verify(payload.challenge.encode(), bytes.fromhex(payload.signature)):
# raise HTTPException(status_code=401, detail="Signature invalide")
# 5. Consume the challenge
del _pending_challenges[payload.address]
# 6. Get or create identity
identity = await get_or_create_identity(db, payload.address)
# 7. Create session token
token = await create_session(db, identity)
return TokenResponse(
token=token,
identity=IdentityOut.model_validate(identity),
)
@router.get("/me", response_model=IdentityOut)
async def get_me(
identity: DuniterIdentity = Depends(get_current_identity),
) -> IdentityOut:
"""Return the currently authenticated identity."""
return IdentityOut.model_validate(identity)
@router.post("/logout", status_code=status.HTTP_204_NO_CONTENT)
async def logout(
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> None:
"""Invalidate the current session token.
Note: get_current_identity already validated the token, so we know it exists.
We re-extract it from the Authorization header to invalidate it.
"""
# We need the raw token to invalidate -- re-extract from the dependency chain.
# Since get_current_identity already validated, we know the request has a valid Bearer token.
# We use a slightly different approach: delete all sessions for this identity
# that match. For a cleaner approach, we accept the token via a dedicated dependency.
from fastapi import Request
# This is handled by getting the token from the auth service
# For simplicity, we delete all sessions for the identity
from sqlalchemy import select
from app.models.user import Session
result = await db.execute(select(Session).where(Session.identity_id == identity.id))
sessions = result.scalars().all()
for session in sessions:
await db.delete(session)
await db.commit()

View File

@@ -0,0 +1,143 @@
"""Decisions router: CRUD for decision processes and their steps."""
from __future__ import annotations
import uuid
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.database import get_db
from app.models.decision import Decision, DecisionStep
from app.models.user import DuniterIdentity
from app.schemas.decision import (
DecisionCreate,
DecisionOut,
DecisionStepCreate,
DecisionStepOut,
DecisionUpdate,
)
from app.services.auth_service import get_current_identity
router = APIRouter()
# ── Helpers ─────────────────────────────────────────────────────────────────
async def _get_decision(db: AsyncSession, decision_id: uuid.UUID) -> Decision:
"""Fetch a decision by ID with its steps eagerly loaded, or raise 404."""
result = await db.execute(
select(Decision)
.options(selectinload(Decision.steps))
.where(Decision.id == decision_id)
)
decision = result.scalar_one_or_none()
if decision is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Decision introuvable")
return decision
# ── Decision routes ─────────────────────────────────────────────────────────
@router.get("/", response_model=list[DecisionOut])
async def list_decisions(
db: AsyncSession = Depends(get_db),
decision_type: str | None = Query(default=None, description="Filtrer par type de decision"),
status_filter: str | None = Query(default=None, alias="status", description="Filtrer par statut"),
skip: int = Query(default=0, ge=0),
limit: int = Query(default=50, ge=1, le=200),
) -> list[DecisionOut]:
"""List all decisions with optional filters."""
stmt = select(Decision).options(selectinload(Decision.steps))
if decision_type is not None:
stmt = stmt.where(Decision.decision_type == decision_type)
if status_filter is not None:
stmt = stmt.where(Decision.status == status_filter)
stmt = stmt.order_by(Decision.created_at.desc()).offset(skip).limit(limit)
result = await db.execute(stmt)
decisions = result.scalars().unique().all()
return [DecisionOut.model_validate(d) for d in decisions]
@router.post("/", response_model=DecisionOut, status_code=status.HTTP_201_CREATED)
async def create_decision(
payload: DecisionCreate,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> DecisionOut:
"""Create a new decision process."""
decision = Decision(
**payload.model_dump(),
created_by_id=identity.id,
)
db.add(decision)
await db.commit()
await db.refresh(decision)
# Reload with steps (empty at creation)
decision = await _get_decision(db, decision.id)
return DecisionOut.model_validate(decision)
@router.get("/{id}", response_model=DecisionOut)
async def get_decision(
id: uuid.UUID,
db: AsyncSession = Depends(get_db),
) -> DecisionOut:
"""Get a single decision with all its steps."""
decision = await _get_decision(db, id)
return DecisionOut.model_validate(decision)
@router.put("/{id}", response_model=DecisionOut)
async def update_decision(
id: uuid.UUID,
payload: DecisionUpdate,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> DecisionOut:
"""Update a decision's metadata (title, description, status, protocol)."""
decision = await _get_decision(db, id)
update_data = payload.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(decision, field, value)
await db.commit()
await db.refresh(decision)
# Reload with steps
decision = await _get_decision(db, decision.id)
return DecisionOut.model_validate(decision)
# ── Decision Step routes ────────────────────────────────────────────────────
@router.post("/{id}/steps", response_model=DecisionStepOut, status_code=status.HTTP_201_CREATED)
async def add_step(
id: uuid.UUID,
payload: DecisionStepCreate,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> DecisionStepOut:
"""Add a step to a decision process."""
# Verify decision exists
decision = await _get_decision(db, id)
step = DecisionStep(
decision_id=decision.id,
**payload.model_dump(),
)
db.add(step)
await db.commit()
await db.refresh(step)
return DecisionStepOut.model_validate(step)

View File

@@ -0,0 +1,262 @@
"""Documents router: CRUD for reference documents, items, and item versions."""
from __future__ import annotations
import difflib
import uuid
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.database import get_db
from app.models.document import Document, DocumentItem, ItemVersion
from app.models.user import DuniterIdentity
from app.schemas.document import (
DocumentCreate,
DocumentItemCreate,
DocumentItemOut,
DocumentOut,
DocumentUpdate,
ItemVersionCreate,
ItemVersionOut,
)
from app.services.auth_service import get_current_identity
router = APIRouter()
# ── Helpers ─────────────────────────────────────────────────────────────────
async def _get_document_by_slug(db: AsyncSession, slug: str) -> Document:
"""Fetch a document by slug or raise 404."""
result = await db.execute(select(Document).where(Document.slug == slug))
doc = result.scalar_one_or_none()
if doc is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Document introuvable")
return doc
async def _get_item(db: AsyncSession, document_id: uuid.UUID, item_id: uuid.UUID) -> DocumentItem:
"""Fetch a document item by ID within a document, or raise 404."""
result = await db.execute(
select(DocumentItem).where(
DocumentItem.id == item_id,
DocumentItem.document_id == document_id,
)
)
item = result.scalar_one_or_none()
if item is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Item introuvable")
return item
# ── Document routes ─────────────────────────────────────────────────────────
@router.get("/", response_model=list[DocumentOut])
async def list_documents(
db: AsyncSession = Depends(get_db),
doc_type: str | None = Query(default=None, description="Filtrer par type de document"),
status_filter: str | None = Query(default=None, alias="status", description="Filtrer par statut"),
skip: int = Query(default=0, ge=0),
limit: int = Query(default=50, ge=1, le=200),
) -> list[DocumentOut]:
"""List all reference documents, with optional filters."""
stmt = select(Document)
if doc_type is not None:
stmt = stmt.where(Document.doc_type == doc_type)
if status_filter is not None:
stmt = stmt.where(Document.status == status_filter)
stmt = stmt.order_by(Document.created_at.desc()).offset(skip).limit(limit)
result = await db.execute(stmt)
documents = result.scalars().all()
# Compute items_count for each document
out = []
for doc in documents:
count_result = await db.execute(
select(func.count()).select_from(DocumentItem).where(DocumentItem.document_id == doc.id)
)
items_count = count_result.scalar() or 0
doc_out = DocumentOut.model_validate(doc)
doc_out.items_count = items_count
out.append(doc_out)
return out
@router.post("/", response_model=DocumentOut, status_code=status.HTTP_201_CREATED)
async def create_document(
payload: DocumentCreate,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> DocumentOut:
"""Create a new reference document."""
# Check slug uniqueness
existing = await db.execute(select(Document).where(Document.slug == payload.slug))
if existing.scalar_one_or_none() is not None:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Un document avec ce slug existe deja",
)
doc = Document(**payload.model_dump())
db.add(doc)
await db.commit()
await db.refresh(doc)
doc_out = DocumentOut.model_validate(doc)
doc_out.items_count = 0
return doc_out
@router.get("/{slug}", response_model=DocumentOut)
async def get_document(
slug: str,
db: AsyncSession = Depends(get_db),
) -> DocumentOut:
"""Get a single document by its slug."""
doc = await _get_document_by_slug(db, slug)
count_result = await db.execute(
select(func.count()).select_from(DocumentItem).where(DocumentItem.document_id == doc.id)
)
items_count = count_result.scalar() or 0
doc_out = DocumentOut.model_validate(doc)
doc_out.items_count = items_count
return doc_out
@router.put("/{slug}", response_model=DocumentOut)
async def update_document(
slug: str,
payload: DocumentUpdate,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> DocumentOut:
"""Update a document's metadata (title, status, description, version)."""
doc = await _get_document_by_slug(db, slug)
update_data = payload.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(doc, field, value)
await db.commit()
await db.refresh(doc)
count_result = await db.execute(
select(func.count()).select_from(DocumentItem).where(DocumentItem.document_id == doc.id)
)
items_count = count_result.scalar() or 0
doc_out = DocumentOut.model_validate(doc)
doc_out.items_count = items_count
return doc_out
# ── Document Item routes ────────────────────────────────────────────────────
@router.post("/{slug}/items", response_model=DocumentItemOut, status_code=status.HTTP_201_CREATED)
async def add_item(
slug: str,
payload: DocumentItemCreate,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> DocumentItemOut:
"""Add a new item (clause, rule, etc.) to a document."""
doc = await _get_document_by_slug(db, slug)
# Determine sort_order: max existing + 1
max_order_result = await db.execute(
select(func.max(DocumentItem.sort_order)).where(DocumentItem.document_id == doc.id)
)
max_order = max_order_result.scalar() or 0
item = DocumentItem(
document_id=doc.id,
sort_order=max_order + 1,
**payload.model_dump(),
)
db.add(item)
await db.commit()
await db.refresh(item)
return DocumentItemOut.model_validate(item)
@router.get("/{slug}/items", response_model=list[DocumentItemOut])
async def list_items(
slug: str,
db: AsyncSession = Depends(get_db),
) -> list[DocumentItemOut]:
"""List all items in a document, ordered by sort_order."""
doc = await _get_document_by_slug(db, slug)
result = await db.execute(
select(DocumentItem)
.where(DocumentItem.document_id == doc.id)
.order_by(DocumentItem.sort_order)
)
items = result.scalars().all()
return [DocumentItemOut.model_validate(item) for item in items]
@router.get("/{slug}/items/{item_id}", response_model=DocumentItemOut)
async def get_item(
slug: str,
item_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
) -> DocumentItemOut:
"""Get a single item with its version history."""
doc = await _get_document_by_slug(db, slug)
item = await _get_item(db, doc.id, item_id)
return DocumentItemOut.model_validate(item)
@router.post(
"/{slug}/items/{item_id}/versions",
response_model=ItemVersionOut,
status_code=status.HTTP_201_CREATED,
)
async def propose_version(
slug: str,
item_id: uuid.UUID,
payload: ItemVersionCreate,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> ItemVersionOut:
"""Propose a new version of a document item.
Automatically computes a unified diff between the current text and the proposed text.
"""
doc = await _get_document_by_slug(db, slug)
item = await _get_item(db, doc.id, item_id)
# Compute diff
diff_lines = difflib.unified_diff(
item.current_text.splitlines(keepends=True),
payload.proposed_text.splitlines(keepends=True),
fromfile="actuel",
tofile="propose",
)
diff_text = "".join(diff_lines) or None
version = ItemVersion(
item_id=item.id,
proposed_text=payload.proposed_text,
diff_text=diff_text,
rationale=payload.rationale,
proposed_by_id=identity.id,
)
db.add(version)
await db.commit()
await db.refresh(version)
return ItemVersionOut.model_validate(version)

View File

@@ -0,0 +1,167 @@
"""Mandates router: CRUD for mandates and their steps."""
from __future__ import annotations
import uuid
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.database import get_db
from app.models.mandate import Mandate, MandateStep
from app.models.user import DuniterIdentity
from app.schemas.mandate import (
MandateCreate,
MandateOut,
MandateStepCreate,
MandateStepOut,
)
from app.services.auth_service import get_current_identity
router = APIRouter()
# ── Helpers ─────────────────────────────────────────────────────────────────
async def _get_mandate(db: AsyncSession, mandate_id: uuid.UUID) -> Mandate:
"""Fetch a mandate by ID with its steps eagerly loaded, or raise 404."""
result = await db.execute(
select(Mandate)
.options(selectinload(Mandate.steps))
.where(Mandate.id == mandate_id)
)
mandate = result.scalar_one_or_none()
if mandate is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Mandat introuvable")
return mandate
# ── Mandate routes ──────────────────────────────────────────────────────────
@router.get("/", response_model=list[MandateOut])
async def list_mandates(
db: AsyncSession = Depends(get_db),
mandate_type: str | None = Query(default=None, description="Filtrer par type de mandat"),
status_filter: str | None = Query(default=None, alias="status", description="Filtrer par statut"),
skip: int = Query(default=0, ge=0),
limit: int = Query(default=50, ge=1, le=200),
) -> list[MandateOut]:
"""List all mandates with optional filters."""
stmt = select(Mandate).options(selectinload(Mandate.steps))
if mandate_type is not None:
stmt = stmt.where(Mandate.mandate_type == mandate_type)
if status_filter is not None:
stmt = stmt.where(Mandate.status == status_filter)
stmt = stmt.order_by(Mandate.created_at.desc()).offset(skip).limit(limit)
result = await db.execute(stmt)
mandates = result.scalars().unique().all()
return [MandateOut.model_validate(m) for m in mandates]
@router.post("/", response_model=MandateOut, status_code=status.HTTP_201_CREATED)
async def create_mandate(
payload: MandateCreate,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> MandateOut:
"""Create a new mandate."""
mandate = Mandate(**payload.model_dump())
db.add(mandate)
await db.commit()
await db.refresh(mandate)
# Reload with steps (empty at creation)
mandate = await _get_mandate(db, mandate.id)
return MandateOut.model_validate(mandate)
@router.get("/{id}", response_model=MandateOut)
async def get_mandate(
id: uuid.UUID,
db: AsyncSession = Depends(get_db),
) -> MandateOut:
"""Get a single mandate with all its steps."""
mandate = await _get_mandate(db, id)
return MandateOut.model_validate(mandate)
@router.put("/{id}", response_model=MandateOut)
async def update_mandate(
id: uuid.UUID,
payload: MandateCreate,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> MandateOut:
"""Update a mandate's metadata."""
mandate = await _get_mandate(db, id)
update_data = payload.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(mandate, field, value)
await db.commit()
await db.refresh(mandate)
# Reload with steps
mandate = await _get_mandate(db, mandate.id)
return MandateOut.model_validate(mandate)
@router.delete("/{id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_mandate(
id: uuid.UUID,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> None:
"""Delete a mandate (only if in draft status)."""
mandate = await _get_mandate(db, id)
if mandate.status != "draft":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Seuls les mandats en brouillon peuvent etre supprimes",
)
await db.delete(mandate)
await db.commit()
# ── Mandate Step routes ─────────────────────────────────────────────────────
@router.post("/{id}/steps", response_model=MandateStepOut, status_code=status.HTTP_201_CREATED)
async def add_step(
id: uuid.UUID,
payload: MandateStepCreate,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> MandateStepOut:
"""Add a step to a mandate process."""
mandate = await _get_mandate(db, id)
step = MandateStep(
mandate_id=mandate.id,
**payload.model_dump(),
)
db.add(step)
await db.commit()
await db.refresh(step)
return MandateStepOut.model_validate(step)
@router.get("/{id}/steps", response_model=list[MandateStepOut])
async def list_steps(
id: uuid.UUID,
db: AsyncSession = Depends(get_db),
) -> list[MandateStepOut]:
"""List all steps for a mandate, ordered by step_order."""
mandate = await _get_mandate(db, id)
return [MandateStepOut.model_validate(s) for s in mandate.steps]

View File

@@ -0,0 +1,139 @@
"""Protocols router: voting protocols and formula configurations."""
from __future__ import annotations
import uuid
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.database import get_db
from app.models.protocol import FormulaConfig, VotingProtocol
from app.models.user import DuniterIdentity
from app.schemas.protocol import (
FormulaConfigCreate,
FormulaConfigOut,
VotingProtocolCreate,
VotingProtocolOut,
)
from app.services.auth_service import get_current_identity
router = APIRouter()
# ── Helpers ─────────────────────────────────────────────────────────────────
async def _get_protocol(db: AsyncSession, protocol_id: uuid.UUID) -> VotingProtocol:
"""Fetch a voting protocol by ID with its formula config, or raise 404."""
result = await db.execute(
select(VotingProtocol)
.options(selectinload(VotingProtocol.formula_config))
.where(VotingProtocol.id == protocol_id)
)
protocol = result.scalar_one_or_none()
if protocol is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Protocole introuvable")
return protocol
# ── Voting Protocol routes ──────────────────────────────────────────────────
@router.get("/", response_model=list[VotingProtocolOut])
async def list_protocols(
db: AsyncSession = Depends(get_db),
vote_type: str | None = Query(default=None, description="Filtrer par type de vote (binary, nuanced)"),
skip: int = Query(default=0, ge=0),
limit: int = Query(default=50, ge=1, le=200),
) -> list[VotingProtocolOut]:
"""List all voting protocols with their formula configurations."""
stmt = select(VotingProtocol).options(selectinload(VotingProtocol.formula_config))
if vote_type is not None:
stmt = stmt.where(VotingProtocol.vote_type == vote_type)
stmt = stmt.order_by(VotingProtocol.created_at.desc()).offset(skip).limit(limit)
result = await db.execute(stmt)
protocols = result.scalars().unique().all()
return [VotingProtocolOut.model_validate(p) for p in protocols]
@router.post("/", response_model=VotingProtocolOut, status_code=status.HTTP_201_CREATED)
async def create_protocol(
payload: VotingProtocolCreate,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> VotingProtocolOut:
"""Create a new voting protocol.
The formula_config_id must reference an existing FormulaConfig.
"""
# Verify formula config exists
fc_result = await db.execute(
select(FormulaConfig).where(FormulaConfig.id == payload.formula_config_id)
)
if fc_result.scalar_one_or_none() is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Configuration de formule introuvable",
)
protocol = VotingProtocol(**payload.model_dump())
db.add(protocol)
await db.commit()
await db.refresh(protocol)
# Reload with formula config
protocol = await _get_protocol(db, protocol.id)
return VotingProtocolOut.model_validate(protocol)
@router.get("/{id}", response_model=VotingProtocolOut)
async def get_protocol(
id: uuid.UUID,
db: AsyncSession = Depends(get_db),
) -> VotingProtocolOut:
"""Get a single voting protocol with its formula configuration."""
protocol = await _get_protocol(db, id)
return VotingProtocolOut.model_validate(protocol)
# ── Formula Config routes ───────────────────────────────────────────────────
@router.get("/formulas", response_model=list[FormulaConfigOut])
async def list_formulas(
db: AsyncSession = Depends(get_db),
skip: int = Query(default=0, ge=0),
limit: int = Query(default=50, ge=1, le=200),
) -> list[FormulaConfigOut]:
"""List all formula configurations."""
stmt = (
select(FormulaConfig)
.order_by(FormulaConfig.created_at.desc())
.offset(skip)
.limit(limit)
)
result = await db.execute(stmt)
formulas = result.scalars().all()
return [FormulaConfigOut.model_validate(f) for f in formulas]
@router.post("/formulas", response_model=FormulaConfigOut, status_code=status.HTTP_201_CREATED)
async def create_formula(
payload: FormulaConfigCreate,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> FormulaConfigOut:
"""Create a new formula configuration for WoT threshold computation."""
formula = FormulaConfig(**payload.model_dump())
db.add(formula)
await db.commit()
await db.refresh(formula)
return FormulaConfigOut.model_validate(formula)

View File

@@ -0,0 +1,73 @@
"""Sanctuary router: IPFS + on-chain anchoring entries."""
from __future__ import annotations
import uuid
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.models.sanctuary import SanctuaryEntry
from app.models.user import DuniterIdentity
from app.schemas.sanctuary import SanctuaryEntryCreate, SanctuaryEntryOut
from app.services.auth_service import get_current_identity
router = APIRouter()
@router.get("/", response_model=list[SanctuaryEntryOut])
async def list_entries(
db: AsyncSession = Depends(get_db),
entry_type: str | None = Query(default=None, description="Filtrer par type (document, decision, vote_result)"),
skip: int = Query(default=0, ge=0),
limit: int = Query(default=50, ge=1, le=200),
) -> list[SanctuaryEntryOut]:
"""List all sanctuary entries with optional type filter."""
stmt = select(SanctuaryEntry)
if entry_type is not None:
stmt = stmt.where(SanctuaryEntry.entry_type == entry_type)
stmt = stmt.order_by(SanctuaryEntry.created_at.desc()).offset(skip).limit(limit)
result = await db.execute(stmt)
entries = result.scalars().all()
return [SanctuaryEntryOut.model_validate(e) for e in entries]
@router.get("/{id}", response_model=SanctuaryEntryOut)
async def get_entry(
id: uuid.UUID,
db: AsyncSession = Depends(get_db),
) -> SanctuaryEntryOut:
"""Get a single sanctuary entry by ID."""
result = await db.execute(select(SanctuaryEntry).where(SanctuaryEntry.id == id))
entry = result.scalar_one_or_none()
if entry is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Entree sanctuaire introuvable")
return SanctuaryEntryOut.model_validate(entry)
@router.post("/", response_model=SanctuaryEntryOut, status_code=status.HTTP_201_CREATED)
async def create_entry(
payload: SanctuaryEntryCreate,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> SanctuaryEntryOut:
"""Create a new sanctuary entry.
This endpoint is typically called by internal services after:
1. Content is hashed (SHA-256)
2. Content is pinned to IPFS
3. Hash is anchored on-chain via system.remark
The IPFS CID and chain TX hash can be added later via updates.
"""
entry = SanctuaryEntry(**payload.model_dump())
db.add(entry)
await db.commit()
await db.refresh(entry)
return SanctuaryEntryOut.model_validate(entry)

View File

@@ -0,0 +1,306 @@
"""Votes router: vote sessions, individual votes, and result computation."""
from __future__ import annotations
import math
import uuid
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.database import get_db
from app.models.protocol import FormulaConfig, VotingProtocol
from app.models.user import DuniterIdentity
from app.models.vote import Vote, VoteSession
from app.schemas.vote import VoteCreate, VoteOut, VoteSessionCreate, VoteSessionOut
from app.services.auth_service import get_current_identity
router = APIRouter()
# ── Helpers ─────────────────────────────────────────────────────────────────
async def _get_session(db: AsyncSession, session_id: uuid.UUID) -> VoteSession:
"""Fetch a vote session by ID with votes eagerly loaded, or raise 404."""
result = await db.execute(
select(VoteSession)
.options(selectinload(VoteSession.votes))
.where(VoteSession.id == session_id)
)
session = result.scalar_one_or_none()
if session is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Session de vote introuvable")
return session
async def _get_protocol_with_formula(db: AsyncSession, protocol_id: uuid.UUID) -> VotingProtocol:
"""Fetch a voting protocol with its formula config, or raise 404."""
result = await db.execute(
select(VotingProtocol)
.options(selectinload(VotingProtocol.formula_config))
.where(VotingProtocol.id == protocol_id)
)
protocol = result.scalar_one_or_none()
if protocol is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Protocole de vote introuvable")
return protocol
def _compute_threshold(formula: FormulaConfig, wot_size: int, votes_total: int) -> float:
"""Compute the WoT-based threshold using the core formula.
Result = C + B^W + (M + (1-M) * (1 - (T/W)^G)) * max(0, T-C)
Where:
- C = constant_base
- B = base_exponent
- W = wot_size
- M = majority_pct / 100
- G = gradient_exponent
- T = votes_total (turnout)
"""
c = formula.constant_base
b = formula.base_exponent
w = max(wot_size, 1)
m = formula.majority_pct / 100.0
g = formula.gradient_exponent
t = votes_total
# Inertia-based threshold
base_power = b ** w if b > 0 else 0.0
turnout_ratio = min(t / w, 1.0) if w > 0 else 0.0
inertia = m + (1 - m) * (1 - turnout_ratio ** g)
threshold = c + base_power + inertia * max(0, t - c)
return threshold
def _compute_result(
session: VoteSession,
formula: FormulaConfig,
) -> dict:
"""Compute the vote result based on tallies and formula.
Returns a dict with threshold_required, result ("adopted" or "rejected"),
and whether Smith/TechComm criteria are met.
"""
threshold = _compute_threshold(formula, session.wot_size, session.votes_total)
# Main criterion: votes_for >= threshold
main_pass = session.votes_for >= threshold
# Smith criterion (if configured)
smith_pass = True
smith_threshold = None
if formula.smith_exponent is not None and session.smith_size > 0:
smith_threshold = math.ceil(session.smith_size ** formula.smith_exponent)
smith_pass = session.smith_votes_for >= smith_threshold
# TechComm criterion (if configured)
techcomm_pass = True
techcomm_threshold = None
if formula.techcomm_exponent is not None and session.techcomm_size > 0:
techcomm_threshold = math.ceil(session.techcomm_size ** formula.techcomm_exponent)
techcomm_pass = session.techcomm_votes_for >= techcomm_threshold
result = "adopted" if (main_pass and smith_pass and techcomm_pass) else "rejected"
return {
"threshold_required": threshold,
"result": result,
"smith_threshold": smith_threshold,
"smith_pass": smith_pass,
"techcomm_threshold": techcomm_threshold,
"techcomm_pass": techcomm_pass,
}
# ── Routes ──────────────────────────────────────────────────────────────────
@router.post("/sessions", response_model=VoteSessionOut, status_code=status.HTTP_201_CREATED)
async def create_vote_session(
payload: VoteSessionCreate,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> VoteSessionOut:
"""Create a new vote session.
The session duration is derived from the linked protocol's formula config.
WoT/Smith/TechComm sizes should be snapshotted from the blockchain at creation time.
"""
# Validate protocol exists and get formula for duration
protocol = await _get_protocol_with_formula(db, payload.voting_protocol_id)
formula = protocol.formula_config
starts_at = datetime.now(timezone.utc)
ends_at = starts_at + timedelta(days=formula.duration_days)
session = VoteSession(
decision_id=payload.decision_id,
item_version_id=payload.item_version_id,
voting_protocol_id=payload.voting_protocol_id,
starts_at=starts_at,
ends_at=ends_at,
# TODO: Snapshot actual WoT sizes from blockchain via Duniter RPC
wot_size=0,
smith_size=0,
techcomm_size=0,
)
db.add(session)
await db.commit()
await db.refresh(session)
return VoteSessionOut.model_validate(session)
@router.get("/sessions/{id}", response_model=VoteSessionOut)
async def get_vote_session(
id: uuid.UUID,
db: AsyncSession = Depends(get_db),
) -> VoteSessionOut:
"""Get a vote session with current tallies."""
session = await _get_session(db, id)
return VoteSessionOut.model_validate(session)
@router.post("/sessions/{id}/vote", response_model=VoteOut, status_code=status.HTTP_201_CREATED)
async def submit_vote(
id: uuid.UUID,
payload: VoteCreate,
db: AsyncSession = Depends(get_db),
identity: DuniterIdentity = Depends(get_current_identity),
) -> VoteOut:
"""Submit a vote to a session.
Each identity can only vote once per session. Submitting again replaces the previous vote.
The vote must include a cryptographic signature for on-chain proof.
"""
session = await _get_session(db, id)
# Verify session is open
if session.status != "open":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cette session de vote n'est pas ouverte",
)
# Verify session hasn't ended
if datetime.now(timezone.utc) > session.ends_at:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cette session de vote est terminee",
)
# Check if voter already voted -- replace if so
existing_result = await db.execute(
select(Vote).where(
Vote.session_id == session.id,
Vote.voter_id == identity.id,
)
)
existing_vote = existing_result.scalar_one_or_none()
if existing_vote is not None:
# Deactivate old vote (keep for audit trail)
existing_vote.is_active = False
# Update tallies: subtract old vote
session.votes_total -= 1
if existing_vote.vote_value == "for":
session.votes_for -= 1
if existing_vote.voter_is_smith:
session.smith_votes_for -= 1
if existing_vote.voter_is_techcomm:
session.techcomm_votes_for -= 1
elif existing_vote.vote_value == "against":
session.votes_against -= 1
# Create new vote
vote = Vote(
session_id=session.id,
voter_id=identity.id,
vote_value=payload.vote_value,
nuanced_level=payload.nuanced_level,
comment=payload.comment,
signature=payload.signature,
signed_payload=payload.signed_payload,
voter_wot_status=identity.wot_status,
voter_is_smith=identity.is_smith,
voter_is_techcomm=identity.is_techcomm,
)
db.add(vote)
# Update tallies: add new vote
session.votes_total += 1
if payload.vote_value == "for":
session.votes_for += 1
if identity.is_smith:
session.smith_votes_for += 1
if identity.is_techcomm:
session.techcomm_votes_for += 1
elif payload.vote_value == "against":
session.votes_against += 1
await db.commit()
await db.refresh(vote)
return VoteOut.model_validate(vote)
@router.get("/sessions/{id}/votes", response_model=list[VoteOut])
async def list_votes(
id: uuid.UUID,
db: AsyncSession = Depends(get_db),
active_only: bool = Query(default=True, description="Ne montrer que les votes actifs"),
) -> list[VoteOut]:
"""List all votes in a session."""
# Verify session exists
await _get_session(db, id)
stmt = select(Vote).where(Vote.session_id == id)
if active_only:
stmt = stmt.where(Vote.is_active.is_(True))
stmt = stmt.order_by(Vote.created_at.asc())
result = await db.execute(stmt)
votes = result.scalars().all()
return [VoteOut.model_validate(v) for v in votes]
@router.get("/sessions/{id}/result")
async def get_vote_result(
id: uuid.UUID,
db: AsyncSession = Depends(get_db),
) -> dict:
"""Compute and return the current result for a vote session.
Uses the WoT threshold formula linked through the voting protocol.
Returns current tallies, computed threshold, and whether the vote passes.
"""
session = await _get_session(db, id)
# Get the protocol and formula
protocol = await _get_protocol_with_formula(db, session.voting_protocol_id)
formula = protocol.formula_config
result_data = _compute_result(session, formula)
return {
"session_id": str(session.id),
"status": session.status,
"votes_for": session.votes_for,
"votes_against": session.votes_against,
"votes_total": session.votes_total,
"wot_size": session.wot_size,
"smith_size": session.smith_size,
"techcomm_size": session.techcomm_size,
"smith_votes_for": session.smith_votes_for,
"techcomm_votes_for": session.techcomm_votes_for,
**result_data,
}

View File

@@ -0,0 +1,140 @@
"""WebSocket router: live vote updates."""
from __future__ import annotations
import json
import uuid
from typing import Any
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
router = APIRouter()
# ── Connection manager ──────────────────────────────────────────────────────
class ConnectionManager:
"""Manages active WebSocket connections grouped by vote session ID."""
def __init__(self) -> None:
# session_id -> list of connected websockets
self._connections: dict[uuid.UUID, list[WebSocket]] = {}
async def connect(self, websocket: WebSocket, session_id: uuid.UUID) -> None:
"""Accept a WebSocket connection and register it for a vote session."""
await websocket.accept()
if session_id not in self._connections:
self._connections[session_id] = []
self._connections[session_id].append(websocket)
def disconnect(self, websocket: WebSocket, session_id: uuid.UUID) -> None:
"""Remove a WebSocket connection from the session group."""
if session_id in self._connections:
self._connections[session_id] = [
ws for ws in self._connections[session_id] if ws is not websocket
]
if not self._connections[session_id]:
del self._connections[session_id]
async def broadcast(self, session_id: uuid.UUID, data: dict[str, Any]) -> None:
"""Broadcast a message to all connections watching a given vote session."""
if session_id not in self._connections:
return
message = json.dumps(data, default=str)
dead: list[WebSocket] = []
for ws in self._connections[session_id]:
try:
await ws.send_text(message)
except Exception:
dead.append(ws)
# Clean up dead connections
for ws in dead:
self.disconnect(ws, session_id)
manager = ConnectionManager()
# ── WebSocket endpoint ──────────────────────────────────────────────────────
@router.websocket("/live")
async def live_updates(websocket: WebSocket) -> None:
"""WebSocket endpoint for live vote session updates.
The client connects and sends a JSON message with the session_id
they want to subscribe to:
{ "action": "subscribe", "session_id": "<uuid>" }
The server will then push vote update events to the client:
{ "event": "vote_update", "session_id": "...", "votes_for": N, "votes_against": N, "votes_total": N }
{ "event": "session_closed", "session_id": "...", "result": "adopted|rejected" }
The client can also unsubscribe:
{ "action": "unsubscribe", "session_id": "<uuid>" }
"""
await websocket.accept()
subscribed_sessions: set[uuid.UUID] = set()
try:
while True:
raw = await websocket.receive_text()
try:
data = json.loads(raw)
except json.JSONDecodeError:
await websocket.send_text(json.dumps({"error": "JSON invalide"}))
continue
action = data.get("action")
session_id_str = data.get("session_id")
if not action or not session_id_str:
await websocket.send_text(
json.dumps({"error": "Champs 'action' et 'session_id' requis"})
)
continue
try:
session_id = uuid.UUID(session_id_str)
except ValueError:
await websocket.send_text(json.dumps({"error": "session_id invalide"}))
continue
if action == "subscribe":
if session_id not in subscribed_sessions:
# Register this websocket in the manager for this session
if session_id not in manager._connections:
manager._connections[session_id] = []
manager._connections[session_id].append(websocket)
subscribed_sessions.add(session_id)
await websocket.send_text(
json.dumps({"event": "subscribed", "session_id": str(session_id)})
)
elif action == "unsubscribe":
if session_id in subscribed_sessions:
manager.disconnect(websocket, session_id)
subscribed_sessions.discard(session_id)
await websocket.send_text(
json.dumps({"event": "unsubscribed", "session_id": str(session_id)})
)
else:
await websocket.send_text(
json.dumps({"error": f"Action inconnue: {action}"})
)
except WebSocketDisconnect:
# Clean up all subscriptions for this client
for session_id in subscribed_sessions:
manager.disconnect(websocket, session_id)

View File

View File

@@ -0,0 +1,53 @@
from __future__ import annotations
from datetime import datetime
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
# ── Request schemas ──────────────────────────────────────────────
class ChallengeRequest(BaseModel):
"""Request a challenge nonce for Ed25519 authentication."""
address: str = Field(..., min_length=1, max_length=64, description="Duniter V2 SS58 address")
class VerifyRequest(BaseModel):
"""Submit the signed challenge to obtain a session token."""
address: str = Field(..., min_length=1, max_length=64)
signature: str = Field(..., description="Ed25519 signature of the challenge (hex)")
challenge: str = Field(..., description="The challenge string that was signed")
# ── Response schemas ─────────────────────────────────────────────
class ChallengeResponse(BaseModel):
"""Returned after requesting a challenge."""
challenge: str
expires_at: datetime
class IdentityOut(BaseModel):
"""Public representation of a Duniter identity."""
model_config = ConfigDict(from_attributes=True)
id: UUID
address: str
display_name: str | None = None
wot_status: str
is_smith: bool
is_techcomm: bool
class TokenResponse(BaseModel):
"""Returned after successful challenge verification."""
token: str
identity: IdentityOut

View File

@@ -0,0 +1,72 @@
from __future__ import annotations
from datetime import datetime
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
# ── Decision ─────────────────────────────────────────────────────
class DecisionStepCreate(BaseModel):
"""Payload for creating a step within a decision process."""
step_order: int = Field(..., ge=0)
step_type: str = Field(..., max_length=32, description="qualification, review, vote, execution, reporting")
title: str | None = Field(default=None, max_length=256)
description: str | None = None
class DecisionStepOut(BaseModel):
"""Full decision step representation."""
model_config = ConfigDict(from_attributes=True)
id: UUID
decision_id: UUID
step_order: int
step_type: str
title: str | None = None
description: str | None = None
status: str
vote_session_id: UUID | None = None
outcome: str | None = None
created_at: datetime
class DecisionCreate(BaseModel):
"""Payload for creating a new decision."""
title: str = Field(..., min_length=1, max_length=256)
description: str | None = None
context: str | None = None
decision_type: str = Field(..., max_length=64, description="runtime_upgrade, document_change, mandate_vote, custom")
voting_protocol_id: UUID | None = None
class DecisionUpdate(BaseModel):
"""Partial update for a decision."""
title: str | None = Field(default=None, max_length=256)
description: str | None = None
status: str | None = Field(default=None, max_length=32)
voting_protocol_id: UUID | None = None
class DecisionOut(BaseModel):
"""Full decision representation returned by the API."""
model_config = ConfigDict(from_attributes=True)
id: UUID
title: str
description: str | None = None
context: str | None = None
decision_type: str
status: str
voting_protocol_id: UUID | None = None
created_by_id: UUID | None = None
created_at: datetime
updated_at: datetime
steps: list[DecisionStepOut] = Field(default_factory=list)

View File

@@ -0,0 +1,103 @@
from __future__ import annotations
from datetime import datetime
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
# ── Document ─────────────────────────────────────────────────────
class DocumentCreate(BaseModel):
"""Payload for creating a new reference document."""
slug: str = Field(..., min_length=1, max_length=128)
title: str = Field(..., min_length=1, max_length=256)
doc_type: str = Field(..., max_length=64, description="licence, engagement, reglement, constitution")
description: str | None = None
version: str | None = Field(default="0.1.0", max_length=32)
class DocumentUpdate(BaseModel):
"""Partial update for a document."""
title: str | None = Field(default=None, max_length=256)
status: str | None = Field(default=None, max_length=32)
description: str | None = None
version: str | None = Field(default=None, max_length=32)
class DocumentOut(BaseModel):
"""Full document representation returned by the API."""
model_config = ConfigDict(from_attributes=True)
id: UUID
slug: str
title: str
doc_type: str
version: str
status: str
description: str | None = None
ipfs_cid: str | None = None
chain_anchor: str | None = None
created_at: datetime
updated_at: datetime
items_count: int = Field(default=0, description="Number of items in this document")
# ── Document Item ────────────────────────────────────────────────
class DocumentItemCreate(BaseModel):
"""Payload for creating a document item (clause, rule, etc.)."""
position: str = Field(..., max_length=16, description='Hierarchical position e.g. "1", "1.1", "3.2"')
item_type: str = Field(default="clause", max_length=32, description="clause, rule, verification, preamble, section")
title: str | None = Field(default=None, max_length=256)
current_text: str = Field(..., min_length=1)
voting_protocol_id: UUID | None = None
class DocumentItemOut(BaseModel):
"""Full document item representation."""
model_config = ConfigDict(from_attributes=True)
id: UUID
document_id: UUID
position: str
item_type: str
title: str | None = None
current_text: str
voting_protocol_id: UUID | None = None
sort_order: int
created_at: datetime
updated_at: datetime
# ── Item Version ─────────────────────────────────────────────────
class ItemVersionCreate(BaseModel):
"""Payload for proposing a new version of a document item."""
proposed_text: str = Field(..., min_length=1)
rationale: str | None = None
class ItemVersionOut(BaseModel):
"""Full item version representation."""
model_config = ConfigDict(from_attributes=True)
id: UUID
item_id: UUID
proposed_text: str
diff_text: str | None = None
rationale: str | None = None
status: str
decision_id: UUID | None = None
proposed_by_id: UUID | None = None
created_at: datetime

View File

@@ -0,0 +1,70 @@
from __future__ import annotations
from datetime import datetime
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
# ── Mandate Step ─────────────────────────────────────────────────
class MandateStepCreate(BaseModel):
"""Payload for creating a step within a mandate process."""
step_order: int = Field(..., ge=0)
step_type: str = Field(
...,
max_length=32,
description="formulation, candidacy, vote, assignment, reporting, completion, revocation",
)
title: str | None = Field(default=None, max_length=256)
description: str | None = None
class MandateStepOut(BaseModel):
"""Full mandate step representation."""
model_config = ConfigDict(from_attributes=True)
id: UUID
mandate_id: UUID
step_order: int
step_type: str
title: str | None = None
description: str | None = None
status: str
vote_session_id: UUID | None = None
outcome: str | None = None
created_at: datetime
# ── Mandate ──────────────────────────────────────────────────────
class MandateCreate(BaseModel):
"""Payload for creating a new mandate."""
title: str = Field(..., min_length=1, max_length=256)
description: str | None = None
mandate_type: str = Field(..., max_length=64, description="techcomm, smith, custom")
decision_id: UUID | None = None
class MandateOut(BaseModel):
"""Full mandate representation returned by the API."""
model_config = ConfigDict(from_attributes=True)
id: UUID
title: str
description: str | None = None
mandate_type: str
status: str
mandatee_id: UUID | None = None
decision_id: UUID | None = None
starts_at: datetime | None = None
ends_at: datetime | None = None
created_at: datetime
updated_at: datetime
steps: list[MandateStepOut] = Field(default_factory=list)

View File

@@ -0,0 +1,83 @@
from __future__ import annotations
from datetime import datetime
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
# ── Formula Config ───────────────────────────────────────────────
class FormulaConfigCreate(BaseModel):
"""Payload for creating a WoT threshold formula configuration."""
name: str = Field(..., min_length=1, max_length=128)
description: str | None = None
# WoT threshold params
duration_days: int = Field(default=30, ge=1, description="Duration of the vote in days")
majority_pct: int = Field(default=50, ge=1, le=100, description="Majority percentage required")
base_exponent: float = Field(default=0.1, ge=0.0, le=1.0, description="Base exponent B in the formula")
gradient_exponent: float = Field(default=0.2, ge=0.0, le=2.0, description="Gradient exponent G in the formula")
constant_base: float = Field(default=0.0, ge=0.0, le=1.0, description="Constant base C in the formula")
# Smith criterion
smith_exponent: float | None = Field(default=None, ge=0.0, le=1.0, description="Smith criterion exponent S")
# TechComm criterion
techcomm_exponent: float | None = Field(default=None, ge=0.0, le=1.0, description="TechComm criterion exponent T")
# Nuanced vote
nuanced_min_participants: int | None = Field(default=None, ge=0, description="Minimum participants for nuanced vote")
nuanced_threshold_pct: int | None = Field(default=None, ge=0, le=100, description="Threshold percentage for nuanced vote")
class FormulaConfigOut(BaseModel):
"""Full formula configuration representation."""
model_config = ConfigDict(from_attributes=True)
id: UUID
name: str
description: str | None = None
duration_days: int
majority_pct: int
base_exponent: float
gradient_exponent: float
constant_base: float
smith_exponent: float | None = None
techcomm_exponent: float | None = None
nuanced_min_participants: int | None = None
nuanced_threshold_pct: int | None = None
created_at: datetime
# ── Voting Protocol ──────────────────────────────────────────────
class VotingProtocolCreate(BaseModel):
"""Payload for creating a voting protocol."""
name: str = Field(..., min_length=1, max_length=128)
description: str | None = None
vote_type: str = Field(..., max_length=32, description="binary, nuanced")
formula_config_id: UUID = Field(..., description="Reference to the formula configuration")
mode_params: str | None = Field(default=None, max_length=64, description='e.g. "D30M50B.1G.2T.1"')
is_meta_governed: bool = Field(default=False, description="Whether this protocol is itself governed by meta-vote")
class VotingProtocolOut(BaseModel):
"""Full voting protocol representation including formula config."""
model_config = ConfigDict(from_attributes=True)
id: UUID
name: str
description: str | None = None
vote_type: str
formula_config_id: UUID
mode_params: str | None = None
is_meta_governed: bool
created_at: datetime
formula_config: FormulaConfigOut

View File

@@ -0,0 +1,35 @@
from __future__ import annotations
from datetime import datetime
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
# ── Sanctuary Entry ──────────────────────────────────────────────
class SanctuaryEntryCreate(BaseModel):
"""Payload for creating a new sanctuary entry (IPFS + chain anchor)."""
entry_type: str = Field(..., max_length=64, description="document, decision, vote_result")
reference_id: UUID = Field(..., description="ID of the referenced entity")
title: str | None = Field(default=None, max_length=256)
content_hash: str = Field(..., max_length=128, description="SHA-256 hash of the content")
class SanctuaryEntryOut(BaseModel):
"""Full sanctuary entry representation."""
model_config = ConfigDict(from_attributes=True)
id: UUID
entry_type: str
reference_id: UUID
title: str | None = None
content_hash: str
ipfs_cid: str | None = None
chain_tx_hash: str | None = None
chain_block: int | None = None
metadata_json: str | None = None
created_at: datetime

View File

@@ -0,0 +1,89 @@
from __future__ import annotations
from datetime import datetime
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
# ── Vote Session ─────────────────────────────────────────────────
class VoteSessionCreate(BaseModel):
"""Payload for opening a new vote session."""
decision_id: UUID | None = None
item_version_id: UUID | None = None
voting_protocol_id: UUID = Field(..., description="ID of the voting protocol to apply")
class VoteSessionOut(BaseModel):
"""Full vote session representation including tallies."""
model_config = ConfigDict(from_attributes=True)
id: UUID
decision_id: UUID | None = None
item_version_id: UUID | None = None
voting_protocol_id: UUID
# Snapshot at session start
wot_size: int
smith_size: int
techcomm_size: int
# Dates
starts_at: datetime
ends_at: datetime
# Status
status: str
# Tallies
votes_for: int
votes_against: int
votes_total: int
smith_votes_for: int
techcomm_votes_for: int
threshold_required: float
result: str | None = None
# Chain recording
chain_recorded: bool
chain_tx_hash: str | None = None
created_at: datetime
# ── Vote ─────────────────────────────────────────────────────────
class VoteCreate(BaseModel):
"""Payload for casting a vote (with cryptographic proof)."""
session_id: UUID
vote_value: str = Field(..., max_length=32, description="for, against, or nuanced level")
nuanced_level: int | None = Field(default=None, ge=0, le=5, description="0-5 for nuanced votes")
comment: str | None = None
signature: str = Field(..., description="Ed25519 signature of signed_payload")
signed_payload: str = Field(..., description="The exact payload that was signed")
class VoteOut(BaseModel):
"""Full vote representation."""
model_config = ConfigDict(from_attributes=True)
id: UUID
session_id: UUID
voter_id: UUID
vote_value: str
nuanced_level: int | None = None
comment: str | None = None
signature: str
signed_payload: str
voter_wot_status: str
voter_is_smith: bool
voter_is_techcomm: bool
is_active: bool
created_at: datetime

View File

View File

@@ -0,0 +1,96 @@
"""Authentication service: challenge generation, token management, current user resolution."""
from __future__ import annotations
import hashlib
import secrets
import uuid
from datetime import datetime, timedelta, timezone
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.database import get_db
from app.models.user import DuniterIdentity, Session
security = HTTPBearer(auto_error=False)
def _hash_token(token: str) -> str:
"""SHA-256 hash of a bearer token for storage."""
return hashlib.sha256(token.encode()).hexdigest()
async def create_session(db: AsyncSession, identity: DuniterIdentity) -> str:
"""Create a new session for the given identity, return the raw bearer token."""
raw_token = secrets.token_urlsafe(48)
token_hash = _hash_token(raw_token)
session = Session(
token_hash=token_hash,
identity_id=identity.id,
expires_at=datetime.now(timezone.utc) + timedelta(hours=settings.TOKEN_EXPIRE_HOURS),
)
db.add(session)
await db.commit()
return raw_token
async def invalidate_session(db: AsyncSession, token: str) -> None:
"""Delete the session matching the given raw token."""
token_hash = _hash_token(token)
result = await db.execute(select(Session).where(Session.token_hash == token_hash))
session = result.scalar_one_or_none()
if session:
await db.delete(session)
await db.commit()
async def get_current_identity(
db: AsyncSession = Depends(get_db),
credentials: HTTPAuthorizationCredentials | None = Depends(security),
) -> DuniterIdentity:
"""Dependency: resolve the current authenticated identity from the bearer token.
Raises 401 if the token is missing, invalid, or expired.
"""
if credentials is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentification requise")
token_hash = _hash_token(credentials.credentials)
result = await db.execute(
select(Session).where(
Session.token_hash == token_hash,
Session.expires_at > datetime.now(timezone.utc),
)
)
session = result.scalar_one_or_none()
if session is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token invalide ou expire")
result = await db.execute(select(DuniterIdentity).where(DuniterIdentity.id == session.identity_id))
identity = result.scalar_one_or_none()
if identity is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Identite introuvable")
return identity
async def get_or_create_identity(db: AsyncSession, address: str) -> DuniterIdentity:
"""Get an existing identity by address or create a new one."""
result = await db.execute(select(DuniterIdentity).where(DuniterIdentity.address == address))
identity = result.scalar_one_or_none()
if identity is None:
identity = DuniterIdentity(address=address)
db.add(identity)
await db.commit()
await db.refresh(identity)
return identity

View File

@@ -0,0 +1,87 @@
"""Blockchain service: retrieve on-chain data from Duniter V2.
Provides functions to query WoT size, Smith sub-WoT size, and
Technical Committee size from the Duniter V2 blockchain.
Currently stubbed with hardcoded values matching GDev test data.
"""
from __future__ import annotations
async def get_wot_size() -> int:
"""Return the current number of WoT members.
TODO: Implement real RPC call using substrate-interface::
from substrateinterface import SubstrateInterface
from app.config import settings
substrate = SubstrateInterface(url=settings.DUNITER_RPC_URL)
# Query membership count
result = substrate.query(
module="Membership",
storage_function="MembershipCount",
)
return int(result.value)
Returns
-------
int
Number of WoT members. Currently returns 7224 (GDev snapshot).
"""
# TODO: Replace with real substrate-interface RPC call
return 7224
async def get_smith_size() -> int:
"""Return the current number of Smith members (forgerons).
TODO: Implement real RPC call using substrate-interface::
from substrateinterface import SubstrateInterface
from app.config import settings
substrate = SubstrateInterface(url=settings.DUNITER_RPC_URL)
# Query Smith membership count
result = substrate.query(
module="SmithMembers",
storage_function="SmithMembershipCount",
)
return int(result.value)
Returns
-------
int
Number of Smith members. Currently returns 20 (GDev snapshot).
"""
# TODO: Replace with real substrate-interface RPC call
return 20
async def get_techcomm_size() -> int:
"""Return the current number of Technical Committee members.
TODO: Implement real RPC call using substrate-interface::
from substrateinterface import SubstrateInterface
from app.config import settings
substrate = SubstrateInterface(url=settings.DUNITER_RPC_URL)
# Query TechComm member count
result = substrate.query(
module="TechnicalCommittee",
storage_function="Members",
)
return len(result.value) if result.value else 0
Returns
-------
int
Number of TechComm members. Currently returns 5 (GDev snapshot).
"""
# TODO: Replace with real substrate-interface RPC call
return 5

View File

@@ -0,0 +1,117 @@
"""Decision service: step advancement logic."""
from __future__ import annotations
import uuid
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models.decision import Decision, DecisionStep
# Valid status transitions for decisions
_DECISION_STATUS_ORDER = [
"draft",
"qualification",
"review",
"voting",
"executed",
"closed",
]
async def advance_decision(decision_id: uuid.UUID, db: AsyncSession) -> Decision:
"""Move a decision to its next step.
Completes the current active step and activates the next pending step.
If no more steps remain, the decision status advances to the next phase.
Parameters
----------
decision_id:
UUID of the Decision to advance.
db:
Async database session.
Returns
-------
Decision
The updated decision.
Raises
------
ValueError
If the decision is not found, or no further advancement is possible.
"""
result = await db.execute(
select(Decision)
.options(selectinload(Decision.steps))
.where(Decision.id == decision_id)
)
decision = result.scalar_one_or_none()
if decision is None:
raise ValueError(f"Decision introuvable : {decision_id}")
if decision.status == "closed":
raise ValueError("La decision est deja cloturee")
steps: list[DecisionStep] = sorted(decision.steps, key=lambda s: s.step_order)
# Find the current active step
active_step: DecisionStep | None = None
for step in steps:
if step.status == "active":
active_step = step
break
if active_step is not None:
# Complete the active step
active_step.status = "completed"
# Activate the next pending step
next_step: DecisionStep | None = None
for step in steps:
if step.step_order > active_step.step_order and step.status == "pending":
next_step = step
break
if next_step is not None:
next_step.status = "active"
else:
# No more steps: advance the decision status
_advance_decision_status(decision)
else:
# No active step: try to activate the first pending step
first_pending: DecisionStep | None = None
for step in steps:
if step.status == "pending":
first_pending = step
break
if first_pending is not None:
first_pending.status = "active"
# Also advance decision out of draft if needed
if decision.status == "draft":
decision.status = "qualification"
else:
# All steps are completed: advance the decision status
_advance_decision_status(decision)
await db.commit()
await db.refresh(decision)
return decision
def _advance_decision_status(decision: Decision) -> None:
"""Move a decision to its next status in the lifecycle."""
try:
current_index = _DECISION_STATUS_ORDER.index(decision.status)
except ValueError:
return
next_index = current_index + 1
if next_index < len(_DECISION_STATUS_ORDER):
decision.status = _DECISION_STATUS_ORDER[next_index]

View File

@@ -0,0 +1,108 @@
"""Document service: retrieval and version management."""
from __future__ import annotations
import uuid
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models.document import Document, DocumentItem, ItemVersion
async def get_document_with_items(slug: str, db: AsyncSession) -> Document | None:
"""Load a document with all its items and their versions, eagerly.
Parameters
----------
slug:
Unique slug of the document.
db:
Async database session.
Returns
-------
Document | None
The document with items and versions loaded, or None if not found.
"""
result = await db.execute(
select(Document)
.options(
selectinload(Document.items).selectinload(DocumentItem.versions)
)
.where(Document.slug == slug)
)
return result.scalar_one_or_none()
async def apply_version(
item_id: uuid.UUID,
version_id: uuid.UUID,
db: AsyncSession,
) -> DocumentItem:
"""Apply an accepted version to a document item.
This replaces the item's current_text with the version's proposed_text
and marks the version as 'accepted'.
Parameters
----------
item_id:
UUID of the DocumentItem to update.
version_id:
UUID of the ItemVersion to apply.
db:
Async database session.
Returns
-------
DocumentItem
The updated document item.
Raises
------
ValueError
If the item or version is not found, or the version does not
belong to the item.
"""
# Load item
item_result = await db.execute(
select(DocumentItem).where(DocumentItem.id == item_id)
)
item = item_result.scalar_one_or_none()
if item is None:
raise ValueError(f"Element de document introuvable : {item_id}")
# Load version
version_result = await db.execute(
select(ItemVersion).where(ItemVersion.id == version_id)
)
version = version_result.scalar_one_or_none()
if version is None:
raise ValueError(f"Version introuvable : {version_id}")
if version.item_id != item.id:
raise ValueError(
f"La version {version_id} n'appartient pas a l'element {item_id}"
)
# Apply the version
item.current_text = version.proposed_text
version.status = "accepted"
# Mark all other pending/voting versions for this item as rejected
other_versions_result = await db.execute(
select(ItemVersion).where(
ItemVersion.item_id == item_id,
ItemVersion.id != version_id,
ItemVersion.status.in_(["proposed", "voting"]),
)
)
for other_version in other_versions_result.scalars():
other_version.status = "rejected"
await db.commit()
await db.refresh(item)
return item

View File

@@ -0,0 +1,118 @@
"""Mandate service: step advancement logic."""
from __future__ import annotations
import uuid
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models.mandate import Mandate, MandateStep
# Valid status transitions for mandates
_MANDATE_STATUS_ORDER = [
"draft",
"candidacy",
"voting",
"active",
"reporting",
"completed",
]
async def advance_mandate(mandate_id: uuid.UUID, db: AsyncSession) -> Mandate:
"""Move a mandate to its next step.
Completes the current active step and activates the next pending step.
If no more steps remain, the mandate status advances to the next phase.
Parameters
----------
mandate_id:
UUID of the Mandate to advance.
db:
Async database session.
Returns
-------
Mandate
The updated mandate.
Raises
------
ValueError
If the mandate is not found, already completed/revoked, or
no further advancement is possible.
"""
result = await db.execute(
select(Mandate)
.options(selectinload(Mandate.steps))
.where(Mandate.id == mandate_id)
)
mandate = result.scalar_one_or_none()
if mandate is None:
raise ValueError(f"Mandat introuvable : {mandate_id}")
if mandate.status in ("completed", "revoked"):
raise ValueError(f"Le mandat est deja en statut terminal : {mandate.status}")
steps: list[MandateStep] = sorted(mandate.steps, key=lambda s: s.step_order)
# Find the current active step
active_step: MandateStep | None = None
for step in steps:
if step.status == "active":
active_step = step
break
if active_step is not None:
# Complete the active step
active_step.status = "completed"
# Activate the next pending step
next_step: MandateStep | None = None
for step in steps:
if step.step_order > active_step.step_order and step.status == "pending":
next_step = step
break
if next_step is not None:
next_step.status = "active"
else:
# No more steps: advance mandate status
_advance_mandate_status(mandate)
else:
# No active step: activate the first pending one
first_pending: MandateStep | None = None
for step in steps:
if step.status == "pending":
first_pending = step
break
if first_pending is not None:
first_pending.status = "active"
# Move out of draft
if mandate.status == "draft":
mandate.status = "candidacy"
else:
# All steps completed: advance status
_advance_mandate_status(mandate)
await db.commit()
await db.refresh(mandate)
return mandate
def _advance_mandate_status(mandate: Mandate) -> None:
"""Move a mandate to its next status in the lifecycle."""
try:
current_index = _MANDATE_STATUS_ORDER.index(mandate.status)
except ValueError:
return
next_index = current_index + 1
if next_index < len(_MANDATE_STATUS_ORDER):
mandate.status = _MANDATE_STATUS_ORDER[next_index]

View File

@@ -0,0 +1,123 @@
"""Sanctuary service: immutable archival to IPFS + on-chain hash.
The sanctuary is the immutable layer of Glibredecision. Every adopted
document version, decision result, or vote tally is hashed (SHA-256),
stored on IPFS, and anchored on-chain via system.remark.
"""
from __future__ import annotations
import hashlib
import json
import uuid
from datetime import datetime, timezone
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.sanctuary import SanctuaryEntry
async def archive_to_sanctuary(
entry_type: str,
reference_id: uuid.UUID,
content: str,
title: str,
db: AsyncSession,
) -> SanctuaryEntry:
"""Hash content and create a sanctuary entry.
Parameters
----------
entry_type:
Type of the archived entity (``"document"``, ``"decision"``,
``"vote_result"``).
reference_id:
UUID of the source entity (document, decision, or vote session).
content:
The full text content to archive and hash.
title:
Human-readable title for the archive entry.
db:
Async database session.
Returns
-------
SanctuaryEntry
The newly created sanctuary entry with content_hash set.
"""
# Compute SHA-256 hash of the content
content_hash = hashlib.sha256(content.encode("utf-8")).hexdigest()
# Build metadata
metadata = {
"archived_at": datetime.now(timezone.utc).isoformat(),
"entry_type": entry_type,
"content_length": len(content),
}
entry = SanctuaryEntry(
entry_type=entry_type,
reference_id=reference_id,
title=title,
content_hash=content_hash,
metadata_json=json.dumps(metadata, ensure_ascii=False),
)
# TODO: Upload content to IPFS via kubo HTTP API
# ipfs_cid = await _upload_to_ipfs(content)
# entry.ipfs_cid = ipfs_cid
# TODO: Anchor hash on-chain via system.remark
# tx_hash, block_number = await _anchor_on_chain(content_hash)
# entry.chain_tx_hash = tx_hash
# entry.chain_block = block_number
db.add(entry)
await db.commit()
await db.refresh(entry)
return entry
async def _upload_to_ipfs(content: str) -> str:
"""Upload content to IPFS via kubo HTTP API.
TODO: Implement using httpx against settings.IPFS_API_URL.
Example::
import httpx
from app.config import settings
async with httpx.AsyncClient() as client:
response = await client.post(
f"{settings.IPFS_API_URL}/api/v0/add",
files={"file": ("content.txt", content.encode("utf-8"))},
)
response.raise_for_status()
return response.json()["Hash"]
"""
raise NotImplementedError("IPFS upload pas encore implemente")
async def _anchor_on_chain(content_hash: str) -> tuple[str, int]:
"""Anchor a content hash on-chain via system.remark.
TODO: Implement using substrate-interface.
Example::
from substrateinterface import SubstrateInterface
from app.config import settings
substrate = SubstrateInterface(url=settings.DUNITER_RPC_URL)
call = substrate.compose_call(
call_module="System",
call_function="remark",
call_params={"remark": f"glibredecision:sanctuary:{content_hash}"},
)
extrinsic = substrate.create_signed_extrinsic(call=call, keypair=keypair)
receipt = substrate.submit_extrinsic(extrinsic, wait_for_inclusion=True)
return receipt.extrinsic_hash, receipt.block_number
"""
raise NotImplementedError("Ancrage on-chain pas encore implemente")

View File

@@ -0,0 +1,199 @@
"""Vote service: compute results and verify vote signatures."""
from __future__ import annotations
import uuid
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.engine.mode_params import parse_mode_params
from app.engine.nuanced_vote import evaluate_nuanced
from app.engine.smith_threshold import smith_threshold
from app.engine.techcomm_threshold import techcomm_threshold
from app.engine.threshold import wot_threshold
from app.models.protocol import FormulaConfig, VotingProtocol
from app.models.vote import Vote, VoteSession
async def compute_result(session_id: uuid.UUID, db: AsyncSession) -> dict:
"""Load a vote session, its protocol and formula, compute thresholds, and tally.
Parameters
----------
session_id:
UUID of the VoteSession to tally.
db:
Async database session.
Returns
-------
dict
Result dict with keys: threshold, votes_for, votes_against,
votes_total, adopted, smith_ok, techcomm_ok, details.
"""
# Load session with votes eagerly
result = await db.execute(
select(VoteSession)
.options(selectinload(VoteSession.votes))
.where(VoteSession.id == session_id)
)
session = result.scalar_one_or_none()
if session is None:
raise ValueError(f"Session de vote introuvable : {session_id}")
# Load protocol + formula config
proto_result = await db.execute(
select(VotingProtocol)
.options(selectinload(VotingProtocol.formula_config))
.where(VotingProtocol.id == session.voting_protocol_id)
)
protocol = proto_result.scalar_one_or_none()
if protocol is None:
raise ValueError(f"Protocole de vote introuvable pour la session {session_id}")
formula: FormulaConfig = protocol.formula_config
# If mode_params is set on the protocol, it overrides formula_config values
if protocol.mode_params:
params = parse_mode_params(protocol.mode_params)
else:
params = {
"majority_pct": formula.majority_pct,
"base_exponent": formula.base_exponent,
"gradient_exponent": formula.gradient_exponent,
"constant_base": formula.constant_base,
"smith_exponent": formula.smith_exponent,
"techcomm_exponent": formula.techcomm_exponent,
}
# Separate vote types
active_votes: list[Vote] = [v for v in session.votes if v.is_active]
if protocol.vote_type == "nuanced":
return await _compute_nuanced(session, active_votes, formula, params, db)
# --- Binary vote ---
votes_for = sum(1 for v in active_votes if v.vote_value == "for")
votes_against = sum(1 for v in active_votes if v.vote_value == "against")
total = votes_for + votes_against
# WoT threshold
threshold = wot_threshold(
wot_size=session.wot_size,
total_votes=total,
majority_pct=params.get("majority_pct", 50),
base_exponent=params.get("base_exponent", 0.1),
gradient_exponent=params.get("gradient_exponent", 0.2),
constant_base=params.get("constant_base", 0.0),
)
# Smith criterion (optional)
smith_ok = True
smith_required = None
if params.get("smith_exponent") is not None and session.smith_size > 0:
smith_required = smith_threshold(session.smith_size, params["smith_exponent"])
smith_votes = sum(1 for v in active_votes if v.voter_is_smith and v.vote_value == "for")
smith_ok = smith_votes >= smith_required
# TechComm criterion (optional)
techcomm_ok = True
techcomm_required = None
if params.get("techcomm_exponent") is not None and session.techcomm_size > 0:
techcomm_required = techcomm_threshold(session.techcomm_size, params["techcomm_exponent"])
techcomm_votes = sum(1 for v in active_votes if v.voter_is_techcomm and v.vote_value == "for")
techcomm_ok = techcomm_votes >= techcomm_required
adopted = votes_for >= threshold and smith_ok and techcomm_ok
vote_result = "adopted" if adopted else "rejected"
# Update session tallies
session.votes_for = votes_for
session.votes_against = votes_against
session.votes_total = total
session.threshold_required = float(threshold)
session.result = vote_result
session.status = "tallied"
await db.commit()
return {
"threshold": threshold,
"votes_for": votes_for,
"votes_against": votes_against,
"votes_total": total,
"adopted": adopted,
"smith_ok": smith_ok,
"smith_required": smith_required,
"techcomm_ok": techcomm_ok,
"techcomm_required": techcomm_required,
"result": vote_result,
}
async def _compute_nuanced(
session: VoteSession,
active_votes: list[Vote],
formula: FormulaConfig,
params: dict,
db: AsyncSession,
) -> dict:
"""Compute a nuanced vote result."""
vote_levels = [v.nuanced_level for v in active_votes if v.nuanced_level is not None]
threshold_pct = formula.nuanced_threshold_pct or 80
min_participants = formula.nuanced_min_participants or 59
evaluation = evaluate_nuanced(
votes=vote_levels,
threshold_pct=threshold_pct,
min_participants=min_participants,
)
vote_result = "adopted" if evaluation["adopted"] else "rejected"
session.votes_total = evaluation["total"]
session.votes_for = evaluation["positive_count"]
session.votes_against = evaluation["total"] - evaluation["positive_count"]
session.threshold_required = float(threshold_pct)
session.result = vote_result
session.status = "tallied"
await db.commit()
return {
"vote_type": "nuanced",
"result": vote_result,
**evaluation,
}
async def verify_vote_signature(address: str, signature: str, payload: str) -> bool:
"""Verify an Ed25519 signature from a Duniter V2 address.
Parameters
----------
address:
SS58 address of the voter.
signature:
Hex-encoded Ed25519 signature.
payload:
The original message that was signed.
Returns
-------
bool
True if the signature is valid.
TODO
----
Implement actual Ed25519 verification using substrate-interface:
from substrateinterface import Keypair
keypair = Keypair(ss58_address=address, crypto_type=KeypairType.ED25519)
return keypair.verify(payload.encode(), bytes.fromhex(signature))
"""
# TODO: Implement real Ed25519 verification with substrate-interface
# For now, accept all signatures in development mode
if not address or not signature or not payload:
return False
return True

View File

View File

@@ -0,0 +1,75 @@
"""Tests for mode-params string parser."""
from app.engine.mode_params import parse_mode_params
class TestParseModeParams:
"""Parse compact parameter strings into structured dicts."""
def test_standard_params(self):
"""D30M50B.1G.2 => standard Licence G1 params."""
result = parse_mode_params("D30M50B.1G.2")
assert result["duration_days"] == 30
assert result["majority_pct"] == 50
assert result["base_exponent"] == 0.1
assert result["gradient_exponent"] == 0.2
# Optional criteria absent
assert result["smith_exponent"] is None
assert result["techcomm_exponent"] is None
def test_with_smith_exponent(self):
"""D30M50B.1G.2S.1 => standard + smith_exponent=0.1."""
result = parse_mode_params("D30M50B.1G.2S.1")
assert result["duration_days"] == 30
assert result["majority_pct"] == 50
assert result["base_exponent"] == 0.1
assert result["gradient_exponent"] == 0.2
assert result["smith_exponent"] == 0.1
assert result["techcomm_exponent"] is None
def test_with_techcomm_exponent(self):
"""D30M50B.1G.2T.1 => standard + techcomm_exponent=0.1."""
result = parse_mode_params("D30M50B.1G.2T.1")
assert result["duration_days"] == 30
assert result["majority_pct"] == 50
assert result["base_exponent"] == 0.1
assert result["gradient_exponent"] == 0.2
assert result["smith_exponent"] is None
assert result["techcomm_exponent"] == 0.1
def test_full_params_with_constant(self):
"""D30M50B1G.5C10 => integer base, gradient=0.5, constant=10."""
result = parse_mode_params("D30M50B1G.5C10")
assert result["duration_days"] == 30
assert result["majority_pct"] == 50
assert result["base_exponent"] == 1.0
assert result["gradient_exponent"] == 0.5
assert result["constant_base"] == 10.0
def test_empty_string_defaults(self):
"""Empty string returns all defaults."""
result = parse_mode_params("")
assert result["duration_days"] == 30
assert result["majority_pct"] == 50
assert result["base_exponent"] == 0.1
assert result["gradient_exponent"] == 0.2
assert result["constant_base"] == 0.0
assert result["smith_exponent"] is None
assert result["techcomm_exponent"] is None
assert result["ratio_multiplier"] is None
assert result["is_ratio_mode"] is False
def test_whitespace_only_returns_defaults(self):
"""Whitespace-only string treated as empty."""
result = parse_mode_params(" ")
assert result["duration_days"] == 30
def test_roundtrip_consistency(self):
"""Parsing a standard string then re-checking all keys."""
result = parse_mode_params("D30M50B.1G.2")
expected_keys = {
"duration_days", "majority_pct", "base_exponent",
"gradient_exponent", "constant_base", "smith_exponent",
"techcomm_exponent", "ratio_multiplier", "is_ratio_mode",
}
assert set(result.keys()) == expected_keys

View File

@@ -0,0 +1,120 @@
"""Tests for six-level nuanced vote evaluation.
Levels: 0-CONTRE, 1-PAS DU TOUT, 2-PAS D'ACCORD, 3-NEUTRE, 4-D'ACCORD, 5-TOUT A FAIT
Positive = levels 3 + 4 + 5
Adoption requires: positive_pct >= threshold (80%) AND total >= min_participants (59).
"""
import pytest
from app.engine.nuanced_vote import evaluate_nuanced
class TestNuancedVoteAdoption:
"""Cases where the vote should be adopted."""
def test_59_positive_10_negative_adopted(self):
"""59 positive (levels 3-5) + 10 negative = 69 total.
positive_pct = 59/69 ~ 85.5% >= 80% and 69 >= 59 => adopted.
"""
votes = [5] * 20 + [4] * 20 + [3] * 19 + [2] * 5 + [1] * 3 + [0] * 2
result = evaluate_nuanced(votes, threshold_pct=80, min_participants=59)
assert result["total"] == 69
assert result["positive_count"] == 59
assert result["positive_pct"] == pytest.approx(85.51, abs=0.1)
assert result["threshold_met"] is True
assert result["min_participants_met"] is True
assert result["adopted"] is True
def test_all_tout_a_fait_adopted(self):
"""All 59 voters at level 5 => 100% positive, adopted."""
votes = [5] * 59
result = evaluate_nuanced(votes, threshold_pct=80, min_participants=59)
assert result["total"] == 59
assert result["positive_count"] == 59
assert result["positive_pct"] == 100.0
assert result["adopted"] is True
class TestNuancedVoteRejection:
"""Cases where the vote should be rejected."""
def test_40_positive_30_negative_rejected(self):
"""40 positive + 30 negative = 70 total.
positive_pct = 40/70 ~ 57.14% < 80% => threshold not met.
"""
votes = [5] * 15 + [4] * 15 + [3] * 10 + [2] * 10 + [1] * 10 + [0] * 10
result = evaluate_nuanced(votes, threshold_pct=80, min_participants=59)
assert result["total"] == 70
assert result["positive_count"] == 40
assert result["positive_pct"] == pytest.approx(57.14, abs=0.1)
assert result["threshold_met"] is False
assert result["min_participants_met"] is True # 70 >= 59
assert result["adopted"] is False
def test_min_participants_not_met(self):
"""50 positive + 5 negative = 55 total < 59 min_participants.
Even though 50/55 ~ 90.9% > 80%, adoption fails on min_participants.
"""
votes = [5] * 30 + [4] * 10 + [3] * 10 + [1] * 3 + [0] * 2
result = evaluate_nuanced(votes, threshold_pct=80, min_participants=59)
assert result["total"] == 55
assert result["positive_count"] == 50
assert result["positive_pct"] > 80
assert result["threshold_met"] is True
assert result["min_participants_met"] is False
assert result["adopted"] is False
class TestNuancedVoteEdgeCases:
"""Edge cases and exact boundary conditions."""
def test_exact_threshold_80_percent(self):
"""Exactly 80% positive votes should pass the threshold."""
# 80 positive out of 100 = exactly 80%
votes = [5] * 40 + [4] * 20 + [3] * 20 + [2] * 10 + [1] * 5 + [0] * 5
result = evaluate_nuanced(votes, threshold_pct=80, min_participants=59)
assert result["total"] == 100
assert result["positive_count"] == 80
assert result["positive_pct"] == 80.0
assert result["threshold_met"] is True
assert result["min_participants_met"] is True
assert result["adopted"] is True
def test_just_below_threshold(self):
"""79 positive out of 100 = 79% < 80% => rejected."""
votes = [5] * 39 + [4] * 20 + [3] * 20 + [2] * 11 + [1] * 5 + [0] * 5
result = evaluate_nuanced(votes, threshold_pct=80, min_participants=59)
assert result["total"] == 100
assert result["positive_count"] == 79
assert result["positive_pct"] == 79.0
assert result["threshold_met"] is False
assert result["adopted"] is False
def test_empty_votes(self):
"""Zero votes => not adopted."""
result = evaluate_nuanced([], threshold_pct=80, min_participants=59)
assert result["total"] == 0
assert result["positive_count"] == 0
assert result["positive_pct"] == 0.0
assert result["adopted"] is False
def test_invalid_vote_level(self):
"""Vote level outside 0-5 raises ValueError."""
with pytest.raises(ValueError, match="invalide"):
evaluate_nuanced([5, 3, 6])
def test_per_level_counts(self):
"""Verify per-level breakdown is correct."""
votes = [0, 1, 2, 3, 4, 5, 5, 4, 3]
result = evaluate_nuanced(votes, threshold_pct=50, min_participants=1)
assert result["per_level_counts"] == {0: 1, 1: 1, 2: 1, 3: 2, 4: 2, 5: 2}
assert result["positive_count"] == 6 # 2+2+2
assert result["total"] == 9

View File

@@ -0,0 +1,180 @@
"""Tests for WoT threshold formula, Smith threshold, and TechComm threshold.
Real-world reference case:
Vote Engagement Forgeron v2.0.0 (Feb 2026)
wot_size=7224, votes_for=97, votes_against=23, total=120
params M=50, B=0.1, G=0.2 => threshold=94 => adopted (97 >= 94)
"""
import math
import pytest
from app.engine.threshold import wot_threshold
from app.engine.smith_threshold import smith_threshold
from app.engine.techcomm_threshold import techcomm_threshold
# ---------------------------------------------------------------------------
# WoT threshold: real-world vote Forgeron
# ---------------------------------------------------------------------------
class TestWotThresholdForgeron:
"""Test with the actual Engagement Forgeron v2.0.0 vote numbers."""
def test_forgeron_vote_passes(self):
"""97 votes_for out of 120 total (wot=7224) must pass."""
threshold = wot_threshold(
wot_size=7224,
total_votes=120,
majority_pct=50,
base_exponent=0.1,
gradient_exponent=0.2,
)
# With low participation (120/7224 ~ 1.66%), near-unanimity is required.
# The historical threshold was 94, and 97 >= 94.
assert 97 >= threshold
# The threshold should be high relative to total votes (inertia effect)
assert threshold > 60, f"Threshold {threshold} should be well above simple majority"
def test_forgeron_vote_threshold_value(self):
"""Verify the computed threshold is in a reasonable range."""
threshold = wot_threshold(
wot_size=7224,
total_votes=120,
majority_pct=50,
base_exponent=0.1,
gradient_exponent=0.2,
)
# At ~1.66% participation, inertia should push threshold close to 78-95%
# of total votes. The exact value depends on the formula.
assert 80 <= threshold <= 120
# ---------------------------------------------------------------------------
# WoT threshold: low participation
# ---------------------------------------------------------------------------
class TestWotThresholdLowParticipation:
"""With very low participation, near-unanimity should be required."""
def test_ten_votes_out_of_7224(self):
"""10 voters out of 7224 => nearly all must vote 'for'."""
threshold = wot_threshold(
wot_size=7224,
total_votes=10,
majority_pct=50,
base_exponent=0.1,
gradient_exponent=0.2,
)
# With participation ratio 10/7224 ~ 0.14%, threshold should be
# very close to total_votes (near-unanimity).
assert threshold >= 9, f"Expected near-unanimity but got threshold={threshold}"
assert threshold <= 10
# ---------------------------------------------------------------------------
# WoT threshold: high participation
# ---------------------------------------------------------------------------
class TestWotThresholdHighParticipation:
"""With high participation, threshold should approach simple majority M."""
def test_3000_votes_out_of_7224(self):
"""3000/7224 ~ 41.5% participation => threshold closer to 50%."""
threshold = wot_threshold(
wot_size=7224,
total_votes=3000,
majority_pct=50,
base_exponent=0.1,
gradient_exponent=0.2,
)
# With ~42% participation, the inertia factor diminishes.
# threshold should be well below 90% of votes but above simple majority.
simple_majority = math.ceil(3000 * 0.5)
assert threshold >= simple_majority, (
f"Threshold {threshold} should be at least simple majority {simple_majority}"
)
# Should be noticeably less than near-unanimity
assert threshold < 2700, (
f"Threshold {threshold} should be much less than near-unanimity at high participation"
)
# ---------------------------------------------------------------------------
# WoT threshold: edge cases
# ---------------------------------------------------------------------------
class TestWotThresholdEdgeCases:
"""Edge-case behaviour."""
def test_zero_total_votes(self):
"""With zero votes, threshold is ceil(C + B^W)."""
threshold = wot_threshold(
wot_size=7224,
total_votes=0,
majority_pct=50,
base_exponent=0.1,
gradient_exponent=0.2,
)
# B^W = 0.1^7224 is effectively 0
expected = math.ceil(0.0 + 0.1 ** 7224)
assert threshold == expected
def test_invalid_wot_size_zero(self):
with pytest.raises(ValueError, match="wot_size"):
wot_threshold(wot_size=0, total_votes=10)
def test_invalid_negative_votes(self):
with pytest.raises(ValueError, match="total_votes"):
wot_threshold(wot_size=100, total_votes=-1)
def test_invalid_majority_pct(self):
with pytest.raises(ValueError, match="majority_pct"):
wot_threshold(wot_size=100, total_votes=10, majority_pct=150)
# ---------------------------------------------------------------------------
# Smith threshold
# ---------------------------------------------------------------------------
class TestSmithThreshold:
"""Test Smith sub-WoT threshold: ceil(smith_size ^ S)."""
def test_smith_size_20_exponent_01(self):
"""smith_size=20, exponent=0.1 => ceil(20^0.1)."""
result = smith_threshold(smith_wot_size=20, exponent=0.1)
expected = math.ceil(20 ** 0.1)
assert result == expected
# 20^0.1 ~ 1.35, ceil => 2
assert result == 2
def test_smith_size_1(self):
"""smith_size=1 => ceil(1^0.1) = 1."""
assert smith_threshold(smith_wot_size=1, exponent=0.1) == 1
def test_smith_invalid(self):
with pytest.raises(ValueError):
smith_threshold(smith_wot_size=0)
# ---------------------------------------------------------------------------
# TechComm threshold
# ---------------------------------------------------------------------------
class TestTechcommThreshold:
"""Test TechComm threshold: ceil(cotec_size ^ T)."""
def test_cotec_size_5_exponent_01(self):
"""cotec_size=5, exponent=0.1 => ceil(5^0.1)."""
result = techcomm_threshold(cotec_size=5, exponent=0.1)
expected = math.ceil(5 ** 0.1)
assert result == expected
# 5^0.1 ~ 1.175, ceil => 2
assert result == 2
def test_cotec_size_1(self):
assert techcomm_threshold(cotec_size=1, exponent=0.1) == 1
def test_cotec_invalid(self):
with pytest.raises(ValueError):
techcomm_threshold(cotec_size=0)