"""IPFS service: upload, retrieve, and pin content via kubo HTTP API. Uses httpx async client to communicate with the local kubo node. All operations handle connection errors gracefully: they log a warning and return None instead of crashing the caller. """ from __future__ import annotations import logging import httpx from app.config import settings logger = logging.getLogger(__name__) # Timeout for IPFS operations (seconds) _IPFS_TIMEOUT = 30.0 async def upload_to_ipfs(content: str | bytes) -> str | None: """Upload content to IPFS via kubo HTTP API (POST /api/v0/add). Parameters ---------- content: The content to upload. Strings are encoded as UTF-8. Returns ------- str | None The IPFS CID (Content Identifier) of the uploaded content, or None if the upload failed. """ if isinstance(content, str): content = content.encode("utf-8") try: async with httpx.AsyncClient(timeout=_IPFS_TIMEOUT) as client: response = await client.post( f"{settings.IPFS_API_URL}/api/v0/add", files={"file": ("content.txt", content, "application/octet-stream")}, ) response.raise_for_status() data = response.json() cid = data.get("Hash") if cid: logger.info("Contenu uploade sur IPFS: CID=%s", cid) return cid except httpx.ConnectError: logger.warning("Impossible de se connecter au noeud IPFS (%s)", settings.IPFS_API_URL) return None except httpx.HTTPStatusError as exc: logger.warning("Erreur HTTP IPFS lors de l'upload: %s", exc.response.status_code) return None except Exception: logger.warning("Erreur inattendue lors de l'upload IPFS", exc_info=True) return None async def get_from_ipfs(cid: str) -> bytes | None: """Retrieve content from IPFS by CID via the gateway. Parameters ---------- cid: The IPFS Content Identifier to retrieve. Returns ------- bytes | None The raw content bytes, or None if retrieval failed. """ try: async with httpx.AsyncClient(timeout=_IPFS_TIMEOUT) as client: response = await client.post( f"{settings.IPFS_API_URL}/api/v0/cat", params={"arg": cid}, ) response.raise_for_status() logger.info("Contenu recupere depuis IPFS: CID=%s", cid) return response.content except httpx.ConnectError: logger.warning("Impossible de se connecter au noeud IPFS (%s)", settings.IPFS_API_URL) return None except httpx.HTTPStatusError as exc: logger.warning("Erreur HTTP IPFS lors de la recuperation (CID=%s): %s", cid, exc.response.status_code) return None except Exception: logger.warning("Erreur inattendue lors de la recuperation IPFS (CID=%s)", cid, exc_info=True) return None async def pin(cid: str) -> bool: """Pin content on the local IPFS node to prevent garbage collection. Parameters ---------- cid: The IPFS Content Identifier to pin. Returns ------- bool True if pinning succeeded, False otherwise. """ try: async with httpx.AsyncClient(timeout=_IPFS_TIMEOUT) as client: response = await client.post( f"{settings.IPFS_API_URL}/api/v0/pin/add", params={"arg": cid}, ) response.raise_for_status() logger.info("Contenu epingle sur IPFS: CID=%s", cid) return True except httpx.ConnectError: logger.warning("Impossible de se connecter au noeud IPFS pour l'epinglage (%s)", settings.IPFS_API_URL) return False except httpx.HTTPStatusError as exc: logger.warning("Erreur HTTP IPFS lors de l'epinglage (CID=%s): %s", cid, exc.response.status_code) return False except Exception: logger.warning("Erreur inattendue lors de l'epinglage IPFS (CID=%s)", cid, exc_info=True) return False