Files
decision/backend/app/routers/public.py
Yvv ed9ed11cd4 Toolbox 30rem sticky + accordéons collapsibles + renommage libreDecision
- Boîte à outils élargie à 30rem (×1.75) — flottante sticky, zéro scroll visible
- ToolboxSection : nouveau composant accordéon générique (chevron, défaut fermé)
- ToolboxVignette : titre cliquable, bullets/actions cachés par défaut
- 4 pages : ContextMapper/SocioElection/WorkflowMilestones/inertie → ToolboxSection
- Suppression doublon SectionLayout (common/) — conflit de nommage résolu
- Renommage complet Glibredecision → libreDecision dans configs/docker/CI
- README.md + CONTRIBUTING.md ajoutés

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-17 00:48:20 +01:00

250 lines
8.3 KiB
Python

"""Public API router: read-only endpoints for external consumption.
All endpoints are accessible without authentication.
No mutations allowed -- strictly read-only.
"""
from __future__ import annotations
import uuid
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.models.decision import Decision
from app.models.document import Document, DocumentItem
from app.models.sanctuary import SanctuaryEntry
from app.models.vote import VoteSession
from app.schemas.document import DocumentFullOut, DocumentItemOut, DocumentOut
from app.schemas.sanctuary import SanctuaryEntryOut
from app.services import document_service, sanctuary_service
router = APIRouter()
# ── Documents (public, read-only) ─────────────────────────────────────────
@router.get(
"/documents",
response_model=list[DocumentOut],
tags=["public-documents"],
summary="Liste publique des documents actifs",
)
async def list_documents(
db: AsyncSession = Depends(get_db),
doc_type: str | None = Query(default=None, description="Filtrer par type de document"),
status_filter: str | None = Query(default=None, alias="status", description="Filtrer par statut"),
skip: int = Query(default=0, ge=0),
limit: int = Query(default=50, ge=1, le=200),
) -> list[DocumentOut]:
"""Liste les documents de reference avec leurs items (lecture seule)."""
stmt = select(Document)
if doc_type is not None:
stmt = stmt.where(Document.doc_type == doc_type)
if status_filter is not None:
stmt = stmt.where(Document.status == status_filter)
stmt = stmt.order_by(Document.created_at.desc()).offset(skip).limit(limit)
result = await db.execute(stmt)
documents = result.scalars().all()
out = []
for doc in documents:
count_result = await db.execute(
select(func.count()).select_from(DocumentItem).where(DocumentItem.document_id == doc.id)
)
items_count = count_result.scalar() or 0
doc_out = DocumentOut.model_validate(doc)
doc_out.items_count = items_count
out.append(doc_out)
return out
@router.get(
"/documents/{slug}",
response_model=DocumentFullOut,
tags=["public-documents"],
summary="Document complet avec ses items",
)
async def get_document(
slug: str,
db: AsyncSession = Depends(get_db),
) -> DocumentFullOut:
"""Recupere un document avec tous ses items (texte complet, serialise)."""
doc = await document_service.get_document_with_items(slug, db)
if doc is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Document introuvable",
)
return DocumentFullOut.model_validate(doc)
# ── Sanctuary (public, read-only) ─────────────────────────────────────────
@router.get(
"/sanctuary/entries",
response_model=list[SanctuaryEntryOut],
tags=["public-sanctuary"],
summary="Liste des entrees du sanctuaire",
)
async def list_sanctuary_entries(
db: AsyncSession = Depends(get_db),
entry_type: str | None = Query(default=None, description="Filtrer par type (document, decision, vote_result)"),
skip: int = Query(default=0, ge=0),
limit: int = Query(default=50, ge=1, le=200),
) -> list[SanctuaryEntryOut]:
"""Liste les entrees du sanctuaire (archives verifiees)."""
stmt = select(SanctuaryEntry)
if entry_type is not None:
stmt = stmt.where(SanctuaryEntry.entry_type == entry_type)
stmt = stmt.order_by(SanctuaryEntry.created_at.desc()).offset(skip).limit(limit)
result = await db.execute(stmt)
entries = result.scalars().all()
return [SanctuaryEntryOut.model_validate(e) for e in entries]
@router.get(
"/sanctuary/entries/{id}",
response_model=SanctuaryEntryOut,
tags=["public-sanctuary"],
summary="Entree du sanctuaire par ID",
)
async def get_sanctuary_entry(
id: uuid.UUID,
db: AsyncSession = Depends(get_db),
) -> SanctuaryEntryOut:
"""Recupere une entree du sanctuaire avec lien IPFS et ancrage on-chain."""
result = await db.execute(
select(SanctuaryEntry).where(SanctuaryEntry.id == id)
)
entry = result.scalar_one_or_none()
if entry is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Entree sanctuaire introuvable",
)
return SanctuaryEntryOut.model_validate(entry)
@router.get(
"/sanctuary/verify/{id}",
tags=["public-sanctuary"],
summary="Verification d'integrite d'une entree",
)
async def verify_sanctuary_entry(
id: uuid.UUID,
db: AsyncSession = Depends(get_db),
) -> dict:
"""Verifie l'integrite d'une entree du sanctuaire (comparaison de hash)."""
try:
result = await sanctuary_service.verify_entry(id, db)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(exc),
)
return result
# ── Votes (public, read-only) ─────────────────────────────────────────────
@router.get(
"/votes/sessions/{id}/result",
tags=["public-votes"],
summary="Resultat d'une session de vote",
)
async def get_vote_result(
id: uuid.UUID,
db: AsyncSession = Depends(get_db),
) -> dict:
"""Recupere le resultat d'une session de vote (lecture seule, public)."""
result = await db.execute(
select(VoteSession).where(VoteSession.id == id)
)
session = result.scalar_one_or_none()
if session is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Session de vote introuvable",
)
return {
"session_id": str(session.id),
"status": session.status,
"votes_for": session.votes_for,
"votes_against": session.votes_against,
"votes_total": session.votes_total,
"wot_size": session.wot_size,
"smith_size": session.smith_size,
"techcomm_size": session.techcomm_size,
"threshold_required": session.threshold_required,
"result": session.result,
"starts_at": session.starts_at.isoformat() if session.starts_at else None,
"ends_at": session.ends_at.isoformat() if session.ends_at else None,
"chain_recorded": session.chain_recorded,
"chain_tx_hash": session.chain_tx_hash,
}
# ── Platform status ───────────────────────────────────────────────────────
@router.get(
"/status",
tags=["public-status"],
summary="Statut de la plateforme",
)
async def platform_status(
db: AsyncSession = Depends(get_db),
) -> dict:
"""Statut general de la plateforme: compteurs de documents, decisions, votes actifs."""
# Count documents
doc_count_result = await db.execute(
select(func.count()).select_from(Document)
)
documents_count = doc_count_result.scalar() or 0
# Count decisions
decision_count_result = await db.execute(
select(func.count()).select_from(Decision)
)
decisions_count = decision_count_result.scalar() or 0
# Count active vote sessions
active_votes_result = await db.execute(
select(func.count()).select_from(VoteSession).where(VoteSession.status == "open")
)
active_votes_count = active_votes_result.scalar() or 0
# Count total vote sessions
total_votes_result = await db.execute(
select(func.count()).select_from(VoteSession)
)
total_votes_count = total_votes_result.scalar() or 0
# Count sanctuary entries
sanctuary_count_result = await db.execute(
select(func.count()).select_from(SanctuaryEntry)
)
sanctuary_count = sanctuary_count_result.scalar() or 0
return {
"platform": "libreDecision",
"documents_count": documents_count,
"decisions_count": decisions_count,
"active_votes_count": active_votes_count,
"total_votes_count": total_votes_count,
"sanctuary_entries_count": sanctuary_count,
}