Files
decision/backend/app/services/sanctuary_service.py
Yvv 2bdc731639 Sprint 2 : moteur de documents + sanctuaire
Backend:
- CRUD complet documents/items/versions (update, delete, accept, reject, reorder)
- Service IPFS (upload/retrieve/pin via kubo HTTP API)
- Service sanctuaire : pipeline SHA-256 + IPFS + on-chain (system.remark)
- Verification integrite des entrees sanctuaire
- Recherche par reference (document -> entrees sanctuaire)
- Serialisation deterministe des documents pour archivage
- 14 tests unitaires supplementaires (document service)

Frontend:
- 9 composants : StatusBadge, MarkdownRenderer, DiffView, ItemCard,
  ItemVersionDiff, DocumentList, SanctuaryEntry, IPFSLink, ChainAnchor
- Page detail item avec historique des versions et diff
- Page detail sanctuaire avec verification integrite
- Modal de creation de document + proposition de version
- Archivage document vers sanctuaire depuis la page detail

Documentation:
- API reference mise a jour (9 nouveaux endpoints)
- Guides utilisateur documents et sanctuaire enrichis

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 13:08:48 +01:00

261 lines
7.8 KiB
Python

"""Sanctuary service: immutable archival to IPFS + on-chain hash.
The sanctuary is the immutable layer of Glibredecision. Every adopted
document version, decision result, or vote tally is hashed (SHA-256),
stored on IPFS, and anchored on-chain via system.remark.
"""
from __future__ import annotations
import hashlib
import json
import logging
import uuid
from datetime import datetime, timezone
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.sanctuary import SanctuaryEntry
from app.services import ipfs_service
logger = logging.getLogger(__name__)
async def archive_to_sanctuary(
entry_type: str,
reference_id: uuid.UUID,
content: str,
title: str,
db: AsyncSession,
) -> SanctuaryEntry:
"""Hash content and create a sanctuary entry.
Pipeline:
1. Hash content (SHA-256)
2. Try to upload to IPFS via ipfs_service (catch errors, log, continue)
3. Try to anchor on-chain via blockchain_service (catch errors, log, continue)
4. Create SanctuaryEntry with whatever succeeded
Parameters
----------
entry_type:
Type of the archived entity (``"document"``, ``"decision"``,
``"vote_result"``).
reference_id:
UUID of the source entity (document, decision, or vote session).
content:
The full text content to archive and hash.
title:
Human-readable title for the archive entry.
db:
Async database session.
Returns
-------
SanctuaryEntry
The newly created sanctuary entry with content_hash set.
"""
# 1. Compute SHA-256 hash of the content
content_hash = hashlib.sha256(content.encode("utf-8")).hexdigest()
# Build metadata
metadata: dict = {
"archived_at": datetime.now(timezone.utc).isoformat(),
"entry_type": entry_type,
"content_length": len(content),
}
ipfs_cid: str | None = None
chain_tx_hash: str | None = None
chain_block: int | None = None
# 2. Try to upload to IPFS
try:
ipfs_cid = await ipfs_service.upload_to_ipfs(content)
if ipfs_cid:
# Pin the content to keep it available
await ipfs_service.pin(ipfs_cid)
metadata["ipfs_cid"] = ipfs_cid
logger.info("Contenu archive sur IPFS: CID=%s", ipfs_cid)
else:
logger.warning("Upload IPFS echoue (retour None) pour %s:%s", entry_type, reference_id)
except Exception:
logger.warning(
"Erreur lors de l'upload IPFS pour %s:%s",
entry_type, reference_id,
exc_info=True,
)
# 3. Try to anchor on-chain (still a structured stub)
try:
chain_tx_hash, chain_block = await _anchor_on_chain(content_hash)
if chain_tx_hash:
metadata["chain_tx_hash"] = chain_tx_hash
metadata["chain_block"] = chain_block
logger.info("Hash ancre on-chain: tx=%s block=%s", chain_tx_hash, chain_block)
except NotImplementedError:
logger.info("Ancrage on-chain pas encore implemente, etape ignoree")
except Exception:
logger.warning(
"Erreur lors de l'ancrage on-chain pour %s:%s",
entry_type, reference_id,
exc_info=True,
)
# 4. Create SanctuaryEntry with whatever succeeded
entry = SanctuaryEntry(
entry_type=entry_type,
reference_id=reference_id,
title=title,
content_hash=content_hash,
ipfs_cid=ipfs_cid,
chain_tx_hash=chain_tx_hash,
chain_block=chain_block,
metadata_json=json.dumps(metadata, ensure_ascii=False),
)
db.add(entry)
await db.commit()
await db.refresh(entry)
return entry
async def verify_entry(
entry_id: uuid.UUID,
db: AsyncSession,
) -> dict:
"""Verify the integrity of a sanctuary entry.
Re-fetches the content (from IPFS if available) and re-hashes it
to compare with the stored content_hash.
Parameters
----------
entry_id:
UUID of the SanctuaryEntry to verify.
db:
Async database session.
Returns
-------
dict
Verification result with keys:
- ``entry_id``: UUID of the entry
- ``valid``: bool indicating if the hash matches
- ``stored_hash``: the stored content_hash
- ``computed_hash``: the re-computed hash (or None if content unavailable)
- ``source``: where the content was fetched from (``"ipfs"`` or ``"unavailable"``)
- ``detail``: human-readable detail message
Raises
------
ValueError
If the entry is not found.
"""
result = await db.execute(
select(SanctuaryEntry).where(SanctuaryEntry.id == entry_id)
)
entry = result.scalar_one_or_none()
if entry is None:
raise ValueError(f"Entree sanctuaire introuvable : {entry_id}")
stored_hash = entry.content_hash
computed_hash: str | None = None
source = "unavailable"
# Try to re-fetch content from IPFS
if entry.ipfs_cid:
try:
content_bytes = await ipfs_service.get_from_ipfs(entry.ipfs_cid)
if content_bytes is not None:
computed_hash = hashlib.sha256(content_bytes).hexdigest()
source = "ipfs"
except Exception:
logger.warning(
"Impossible de recuperer le contenu IPFS pour verification (CID=%s)",
entry.ipfs_cid,
exc_info=True,
)
if computed_hash is None:
return {
"entry_id": entry.id,
"valid": False,
"stored_hash": stored_hash,
"computed_hash": None,
"source": source,
"detail": "Contenu indisponible pour la verification",
}
is_valid = computed_hash == stored_hash
return {
"entry_id": entry.id,
"valid": is_valid,
"stored_hash": stored_hash,
"computed_hash": computed_hash,
"source": source,
"detail": "Integrite verifiee" if is_valid else "Hash different - contenu potentiellement altere",
}
async def get_entries_by_reference(
reference_id: uuid.UUID,
db: AsyncSession,
) -> list[SanctuaryEntry]:
"""Query all sanctuary entries for a given reference_id.
Parameters
----------
reference_id:
UUID of the referenced entity (document, decision, etc.).
db:
Async database session.
Returns
-------
list[SanctuaryEntry]
All entries matching the reference_id, ordered by creation date desc.
"""
result = await db.execute(
select(SanctuaryEntry)
.where(SanctuaryEntry.reference_id == reference_id)
.order_by(SanctuaryEntry.created_at.desc())
)
return list(result.scalars().all())
async def _anchor_on_chain(content_hash: str) -> tuple[str | None, int | None]:
"""Anchor a content hash on-chain via system.remark.
Currently a stub. When implemented, this will use substrate-interface
to submit a system.remark extrinsic containing the content hash.
Example::
from substrateinterface import SubstrateInterface
from app.config import settings
substrate = SubstrateInterface(url=settings.DUNITER_RPC_URL)
call = substrate.compose_call(
call_module="System",
call_function="remark",
call_params={"remark": f"glibredecision:sanctuary:{content_hash}"},
)
extrinsic = substrate.create_signed_extrinsic(call=call, keypair=keypair)
receipt = substrate.submit_extrinsic(extrinsic, wait_for_inclusion=True)
return receipt.extrinsic_hash, receipt.block_number
Parameters
----------
content_hash:
The SHA-256 hash to anchor.
Returns
-------
tuple[str | None, int | None]
(tx_hash, block_number) or (None, None) if not implemented.
"""
raise NotImplementedError("Ancrage on-chain pas encore implemente")