"""AI framing service for decision qualification. Orchestrates a 2-round conversation that clarifies reversibility and urgency before producing a final QualificationResult. Two entry points: ai_frame() — synchronous, rule-based only. Used by tests. ai_frame_async() — async, enriched by Claude API (or Qwen3.6 later). Falls back to ai_frame() if ANTHROPIC_API_KEY is unset. The interface (AIFrameRequest / AIFrameResponse) is stable; the underlying engine is swappable (Qwen3.6 MacStudio planned to replace Claude calls). """ from __future__ import annotations import json import logging from dataclasses import dataclass logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Schemas (dataclasses — no Pydantic dependency in the engine layer) # --------------------------------------------------------------------------- @dataclass class AIMessage: role: str # "user" | "assistant" content: str @dataclass class AIQuestion: id: str text: str options: list[str] @dataclass class AIQualifyResult: decision_type: str process: str recommended_modalities: list[str] recommend_onchain: bool onchain_reason: str | None confidence: str collective_available: bool record_in_observatory: bool reasons: list[str] @dataclass class AIFrameRequest: within_mandate: bool = False affected_count: int | None = None is_structural: bool = False context: str | None = None messages: list[AIMessage] | None = None def __post_init__(self): if self.messages is None: self.messages = [] @dataclass class AIFrameResponse: done: bool questions: list[AIQuestion] result: AIQualifyResult | None explanation: str | None # --------------------------------------------------------------------------- # Standard clarifying questions (fallback when no context or API unavailable) # --------------------------------------------------------------------------- _CLARIFYING_QUESTIONS: list[AIQuestion] = [ AIQuestion( id="reversibility", text="Si cette décision s'avère inappropriée dans 6 mois, peut-on facilement revenir en arrière ?", options=[ "Oui, facilement", "Difficilement", "Non, c'est irréversible", ], ), AIQuestion( id="urgency", text="Y a-t-il une contrainte temporelle sur cette décision ?", options=[ "Urgente (< 1 semaine)", "Délai raisonnable (quelques semaines)", "Pas d'urgence", ], ), ] # --------------------------------------------------------------------------- # Synchronous rule-based engine (used directly by tests) # --------------------------------------------------------------------------- def ai_frame(request: AIFrameRequest) -> AIFrameResponse: """Run one round of rule-based AI framing. Round 1 (messages=[]) → return 2 clarifying questions, done=False Round 2 (messages set) → parse answers, qualify, return result, done=True """ messages = request.messages or [] if not messages: return AIFrameResponse( done=False, questions=list(_CLARIFYING_QUESTIONS), result=None, explanation=None, ) answers = _parse_answers(messages) result = _build_result(request, answers) explanation = _build_explanation(answers) return AIFrameResponse( done=True, questions=[], result=result, explanation=explanation, ) # --------------------------------------------------------------------------- # Async Claude-enriched entry point (used by the router) # --------------------------------------------------------------------------- async def ai_frame_async(request: AIFrameRequest) -> AIFrameResponse: """Async entry point: rule engine + Claude enrichment. Falls back to ai_frame() if ANTHROPIC_API_KEY is not configured. Qwen3.6 (MacStudio) will replace Claude calls when available. """ base = ai_frame(request) client = _get_claude_client() if client is None: return base messages = request.messages or [] if not messages: # Round 1: context-aware questions enriched_questions = await _claude_questions(client, request.context, base.questions) return AIFrameResponse(done=False, questions=enriched_questions, result=None, explanation=None) else: # Round 2: enriched explanation enriched_explanation = await _claude_explanation(client, request, base) return AIFrameResponse( done=True, questions=[], result=base.result, explanation=enriched_explanation or base.explanation, ) # --------------------------------------------------------------------------- # Claude helpers # --------------------------------------------------------------------------- def _get_claude_client(): """Return an AsyncAnthropic client if ANTHROPIC_API_KEY is set, else None.""" try: from app.config import settings if not settings.ANTHROPIC_API_KEY: return None import anthropic return anthropic.AsyncAnthropic(api_key=settings.ANTHROPIC_API_KEY) except Exception: return None async def _claude_questions(client, context: str | None, fallback: list[AIQuestion]) -> list[AIQuestion]: """Generate context-aware clarifying questions. Falls back to standard questions on error.""" if not context: return fallback from app.config import settings prompt = f"""Tu assistes à la qualification d'une décision collective dans la communauté Duniter/G1 (monnaie libre). Contexte de la décision : {context} Génère 2 questions de clarification courtes et précises pour mieux qualifier cette décision. Chaque question doit avoir exactement 3 options de réponse. Les questions doivent porter sur la réversibilité et l'urgence, adaptées au contexte. Réponds UNIQUEMENT en JSON valide, sans texte avant ni après : [ {{"id": "reversibility", "text": "...", "options": ["option1", "option2", "option3"]}}, {{"id": "urgency", "text": "...", "options": ["option1", "option2", "option3"]}} ]""" try: message = await client.messages.create( model=settings.ANTHROPIC_MODEL, max_tokens=512, messages=[{"role": "user", "content": prompt}], ) raw = message.content[0].text.strip() start = raw.find("[") end = raw.rfind("]") + 1 data = json.loads(raw[start:end]) return [AIQuestion(id=q["id"], text=q["text"], options=q["options"]) for q in data] except Exception as exc: logger.warning("Claude question generation failed: %s — using fallback", exc) return fallback async def _claude_explanation(client, request: AIFrameRequest, base: AIFrameResponse) -> str | None: """Generate a contextual explanation for the qualification result.""" if base.result is None: return None from app.config import settings answers = _parse_answers(request.messages or []) rev = answers.get("reversibility", "non précisé") urg = answers.get("urgency", "non précisé") context_line = f"\nContexte : {request.context}" if request.context else "" prompt = f"""Tu qualifies une décision collective pour la communauté Duniter/G1 (monnaie libre).{context_line} Paramètres : - Personnes concernées : {request.affected_count or 'non précisé'} - Dans le cadre d'un mandat : {'oui' if request.within_mandate else 'non'} - Décision structurante (on-chain) : {'oui' if request.is_structural else 'non'} - Réversibilité : {rev} - Urgence : {urg} Résultat du moteur de qualification : - Type : {base.result.decision_type} - Processus recommandé : {base.result.process} - Modalités : {', '.join(base.result.recommended_modalities) or 'aucune'} - Gravure on-chain : {'recommandée' if base.result.recommend_onchain else 'non nécessaire'} Rédige une explication courte (2-3 phrases) qui explique pourquoi ce processus est recommandé \ pour cette décision spécifique. Sois direct et concis. Réponds en français.""" try: message = await client.messages.create( model=settings.ANTHROPIC_MODEL, max_tokens=300, messages=[{"role": "user", "content": prompt}], ) return message.content[0].text.strip() except Exception as exc: logger.warning("Claude explanation generation failed: %s — using fallback", exc) return None # --------------------------------------------------------------------------- # Rule-based helpers (shared by sync and async paths) # --------------------------------------------------------------------------- def _parse_answers(messages: list[AIMessage]) -> dict[str, str]: """Extract question answers from the last user message. Expected format: "reversibility:|urgency:" """ answers: dict[str, str] = {} for msg in reversed(messages): if msg.role == "user" and "|" in msg.content and ":" in msg.content: for part in msg.content.split("|"): if ":" in part: key, _, val = part.partition(":") answers[key.strip()] = val.strip() break return answers def _build_result(request: AIFrameRequest, answers: dict[str, str]) -> AIQualifyResult: """Produce a qualification result enriched by the AI answers.""" from app.engine.qualifier import ( QualificationConfig, QualificationInput, qualify, ) config = QualificationConfig() inp = QualificationInput( within_mandate=request.within_mandate, affected_count=request.affected_count, is_structural=request.is_structural, context_description=request.context, ) base = qualify(inp, config) reasons = list(base.reasons) reversibility = answers.get("reversibility", "") if "irréversible" in reversibility.lower(): reasons.append("Décision irréversible : consensus élevé recommandé.") urgency = answers.get("urgency", "") if "urgente" in urgency.lower() or "< 1" in urgency: reasons.append("Urgence signalée : privilégier un protocole à délai court.") return AIQualifyResult( decision_type=base.decision_type.value, process=base.process, recommended_modalities=base.recommended_modalities, recommend_onchain=base.recommend_onchain, onchain_reason=base.onchain_reason, confidence=base.confidence, collective_available=base.collective_available, record_in_observatory=base.record_in_observatory, reasons=reasons, ) def _build_explanation(answers: dict[str, str]) -> str: parts = [] rev = answers.get("reversibility", "") urg = answers.get("urgency", "") if rev: parts.append(f"Réversibilité : {rev}.") if urg: parts.append(f"Urgence : {urg}.") return " ".join(parts) if parts else "Qualification basée sur les éléments fournis."