"""Service for importing household data from CSV/XLSX files.""" import io import secrets import string import pandas as pd from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.models import Household # Characters without ambiguous ones (O/0/I/1/l) SAFE_CHARS = string.ascii_uppercase.replace("O", "").replace("I", "") + string.digits.replace("0", "").replace("1", "") def generate_auth_code(length: int = 8) -> str: return "".join(secrets.choice(SAFE_CHARS) for _ in range(length)) VALID_STATUSES = {"RS", "RP", "PRO"} REQUIRED_COLUMNS = {"identifier", "status", "volume_m3", "price_eur"} def parse_import_file(file_bytes: bytes, filename: str) -> tuple[pd.DataFrame | None, list[str]]: """ Parse a CSV or XLSX file and validate its contents. Returns (dataframe, errors). If errors is non-empty, dataframe may be None. """ errors = [] try: if filename.endswith(".csv"): df = pd.read_csv(io.BytesIO(file_bytes)) elif filename.endswith((".xlsx", ".xls")): df = pd.read_excel(io.BytesIO(file_bytes)) else: return None, ["Format non supporté. Utilisez CSV ou XLSX."] except Exception as e: return None, [f"Erreur de lecture du fichier: {e}"] # Normalize column names df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns] missing = REQUIRED_COLUMNS - set(df.columns) if missing: return None, [f"Colonnes manquantes: {', '.join(missing)}"] # Validate rows for idx, row in df.iterrows(): line = idx + 2 # Excel line number (1-indexed + header) status = str(row["status"]).strip().upper() if status not in VALID_STATUSES: errors.append(f"Ligne {line}: statut '{row['status']}' invalide (attendu: RS, RP, PRO)") try: vol = float(row["volume_m3"]) if vol < 0: errors.append(f"Ligne {line}: volume négatif ({vol})") except (ValueError, TypeError): errors.append(f"Ligne {line}: volume invalide '{row['volume_m3']}'") price = row.get("price_eur") if pd.notna(price): try: p = float(price) if p < 0: errors.append(f"Ligne {line}: prix négatif ({p})") except (ValueError, TypeError): errors.append(f"Ligne {line}: prix invalide '{price}'") # Normalize df["status"] = df["status"].str.strip().str.upper() df["identifier"] = df["identifier"].astype(str).str.strip() return df, errors async def import_households( db: AsyncSession, commune_id: int, df: pd.DataFrame, ) -> tuple[int, list[str]]: """ Import validated households into the database. Returns (created_count, errors). """ created = 0 errors = [] # Get existing auth codes to avoid collisions existing_codes = set() result = await db.execute(select(Household.auth_code)) for row in result.scalars(): existing_codes.add(row) for idx, row in df.iterrows(): identifier = str(row["identifier"]).strip() status = str(row["status"]).strip().upper() volume = float(row["volume_m3"]) price = float(row["price_eur"]) if pd.notna(row.get("price_eur")) else 0.0 # Check for duplicate existing = await db.execute( select(Household).where( Household.commune_id == commune_id, Household.identifier == identifier, ) ) if existing.scalar_one_or_none(): errors.append(f"Foyer '{identifier}' existe déjà, ignoré.") continue # Generate unique auth code code = generate_auth_code() while code in existing_codes: code = generate_auth_code() existing_codes.add(code) household = Household( commune_id=commune_id, identifier=identifier, status=status, volume_m3=volume, price_paid_eur=price, auth_code=code, ) db.add(household) created += 1 await db.commit() return created, errors def generate_template_csv() -> bytes: """Generate a template CSV file for household import.""" content = "identifier,status,volume_m3,price_eur\n" content += "DUPONT Jean,RS,85.5,189.50\n" content += "MARTIN Pierre,RP,120.0,245.00\n" content += "SARL Boulangerie,PRO,350.0,\n" return content.encode("utf-8")