"""Blockchain service: retrieve on-chain data from Duniter V2. Provides functions to query WoT size, Smith sub-WoT size, and Technical Committee size from the Duniter V2 blockchain. Architecture: 1. Check database cache (via cache_service) 2. Try JSON-RPC call to Duniter node 3. Fall back to hardcoded GDev test values (with warning log) All public functions accept a db session for cache access. """ from __future__ import annotations import logging import httpx from sqlalchemy.ext.asyncio import AsyncSession from app.config import settings from app.services import cache_service logger = logging.getLogger(__name__) # Hardcoded fallback values from GDev snapshot _FALLBACK_WOT_SIZE = 7224 _FALLBACK_SMITH_SIZE = 20 _FALLBACK_TECHCOMM_SIZE = 5 # Cache key prefixes _CACHE_KEY_WOT = "blockchain:wot_size" _CACHE_KEY_SMITH = "blockchain:smith_size" _CACHE_KEY_TECHCOMM = "blockchain:techcomm_size" async def _fetch_from_rpc(method: str, params: list | None = None) -> dict | None: """Send a JSON-RPC POST request to the Duniter node. Uses the HTTP variant of the RPC URL. If DUNITER_RPC_URL starts with ``wss://`` or ``ws://``, it is converted to ``https://`` or ``http://`` for the HTTP JSON-RPC endpoint. Parameters ---------- method: The RPC method name (e.g. ``"state_getStorage"``). params: Optional list of parameters for the RPC call. Returns ------- dict | None The ``"result"`` field from the JSON-RPC response, or None on error. """ # Convert WebSocket URL to HTTP for JSON-RPC rpc_url = settings.DUNITER_RPC_URL if rpc_url.startswith("wss://"): rpc_url = rpc_url.replace("wss://", "https://", 1) elif rpc_url.startswith("ws://"): rpc_url = rpc_url.replace("ws://", "http://", 1) # Strip /ws suffix if present if rpc_url.endswith("/ws"): rpc_url = rpc_url[:-3] payload = { "jsonrpc": "2.0", "id": 1, "method": method, "params": params or [], } try: async with httpx.AsyncClient( timeout=settings.DUNITER_RPC_TIMEOUT_SECONDS ) as client: response = await client.post(rpc_url, json=payload) response.raise_for_status() data = response.json() if "error" in data: logger.warning( "Erreur RPC Duniter pour %s: %s", method, data["error"], ) return None return data.get("result") except httpx.ConnectError: logger.warning( "Impossible de se connecter au noeud Duniter (%s)", rpc_url ) return None except httpx.TimeoutException: logger.warning( "Timeout lors de l'appel RPC Duniter pour %s (%s)", method, rpc_url, ) return None except httpx.HTTPStatusError as exc: logger.warning( "Erreur HTTP Duniter RPC pour %s: %s", method, exc.response.status_code, ) return None except Exception: logger.warning( "Erreur inattendue lors de l'appel RPC Duniter pour %s", method, exc_info=True, ) return None async def _fetch_membership_count(db: AsyncSession) -> int | None: """Fetch WoT membership count from the Duniter RPC. Queries ``membership_membershipsCount`` via state RPC. Returns ------- int | None The membership count, or None if the RPC call failed. """ # Try runtime API call for membership count result = await _fetch_from_rpc("membership_membershipsCount", []) if result is not None: try: count = int(result) # Cache the result await cache_service.set_cached( _CACHE_KEY_WOT, {"value": count}, db, ttl_seconds=settings.BLOCKCHAIN_CACHE_TTL_SECONDS, ) return count except (ValueError, TypeError): logger.warning("Reponse RPC invalide pour membership count: %s", result) return None async def _fetch_smith_count(db: AsyncSession) -> int | None: """Fetch Smith membership count from the Duniter RPC. Returns ------- int | None The Smith member count, or None if the RPC call failed. """ result = await _fetch_from_rpc("smithMembers_smithMembersCount", []) if result is not None: try: count = int(result) await cache_service.set_cached( _CACHE_KEY_SMITH, {"value": count}, db, ttl_seconds=settings.BLOCKCHAIN_CACHE_TTL_SECONDS, ) return count except (ValueError, TypeError): logger.warning("Reponse RPC invalide pour smith count: %s", result) return None async def _fetch_techcomm_count(db: AsyncSession) -> int | None: """Fetch Technical Committee member count from the Duniter RPC. Returns ------- int | None The TechComm member count, or None if the RPC call failed. """ result = await _fetch_from_rpc("technicalCommittee_members", []) if result is not None: try: if isinstance(result, list): count = len(result) else: count = int(result) await cache_service.set_cached( _CACHE_KEY_TECHCOMM, {"value": count}, db, ttl_seconds=settings.BLOCKCHAIN_CACHE_TTL_SECONDS, ) return count except (ValueError, TypeError): logger.warning("Reponse RPC invalide pour techcomm count: %s", result) return None async def get_wot_size(db: AsyncSession) -> int: """Return the current number of WoT members. Resolution order: 1. Database cache (if not expired) 2. Duniter RPC call 3. Hardcoded fallback (7224, GDev snapshot) Parameters ---------- db: Async database session (for cache access). Returns ------- int Number of WoT members. """ # 1. Try cache cached = await cache_service.get_cached(_CACHE_KEY_WOT, db) if cached is not None: return cached["value"] # 2. Try RPC rpc_value = await _fetch_membership_count(db) if rpc_value is not None: return rpc_value # 3. Fallback logger.warning( "Utilisation de la valeur WoT par defaut (%d) - " "cache et RPC indisponibles", _FALLBACK_WOT_SIZE, ) return _FALLBACK_WOT_SIZE async def get_smith_size(db: AsyncSession) -> int: """Return the current number of Smith members (forgerons). Resolution order: 1. Database cache (if not expired) 2. Duniter RPC call 3. Hardcoded fallback (20, GDev snapshot) Parameters ---------- db: Async database session (for cache access). Returns ------- int Number of Smith members. """ # 1. Try cache cached = await cache_service.get_cached(_CACHE_KEY_SMITH, db) if cached is not None: return cached["value"] # 2. Try RPC rpc_value = await _fetch_smith_count(db) if rpc_value is not None: return rpc_value # 3. Fallback logger.warning( "Utilisation de la valeur Smith par defaut (%d) - " "cache et RPC indisponibles", _FALLBACK_SMITH_SIZE, ) return _FALLBACK_SMITH_SIZE async def get_techcomm_size(db: AsyncSession) -> int: """Return the current number of Technical Committee members. Resolution order: 1. Database cache (if not expired) 2. Duniter RPC call 3. Hardcoded fallback (5, GDev snapshot) Parameters ---------- db: Async database session (for cache access). Returns ------- int Number of TechComm members. """ # 1. Try cache cached = await cache_service.get_cached(_CACHE_KEY_TECHCOMM, db) if cached is not None: return cached["value"] # 2. Try RPC rpc_value = await _fetch_techcomm_count(db) if rpc_value is not None: return rpc_value # 3. Fallback logger.warning( "Utilisation de la valeur TechComm par defaut (%d) - " "cache et RPC indisponibles", _FALLBACK_TECHCOMM_SIZE, ) return _FALLBACK_TECHCOMM_SIZE