Full-stack app for participatory water pricing using Bezier curves. - Backend: FastAPI + SQLAlchemy + SQLite with JWT auth - Frontend: Nuxt 4 + TypeScript with interactive SVG editor - Math engine: cubic Bezier tarification with Cardano solver - Admin: commune management, household import, vote monitoring, CMS - Citizen: interactive curve editor, vote submission - Docker-compose deployment ready Includes fixes for: - Impact table snake_case/camelCase property mismatch - CMS content backend API + frontend editor (was stub) - Admin route protection middleware - Public content display on commune page - Vote confirmation page link fix Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
144 lines
4.4 KiB
Python
144 lines
4.4 KiB
Python
"""Service for importing household data from CSV/XLSX files."""
|
|
|
|
import io
|
|
import secrets
|
|
import string
|
|
|
|
import pandas as pd
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select
|
|
|
|
from app.models import Household
|
|
|
|
# Characters without ambiguous ones (O/0/I/1/l)
|
|
SAFE_CHARS = string.ascii_uppercase.replace("O", "").replace("I", "") + string.digits.replace("0", "").replace("1", "")
|
|
|
|
|
|
def generate_auth_code(length: int = 8) -> str:
|
|
return "".join(secrets.choice(SAFE_CHARS) for _ in range(length))
|
|
|
|
|
|
VALID_STATUSES = {"RS", "RP", "PRO"}
|
|
REQUIRED_COLUMNS = {"identifier", "status", "volume_m3", "price_eur"}
|
|
|
|
|
|
def parse_import_file(file_bytes: bytes, filename: str) -> tuple[pd.DataFrame | None, list[str]]:
|
|
"""
|
|
Parse a CSV or XLSX file and validate its contents.
|
|
|
|
Returns (dataframe, errors). If errors is non-empty, dataframe may be None.
|
|
"""
|
|
errors = []
|
|
|
|
try:
|
|
if filename.endswith(".csv"):
|
|
df = pd.read_csv(io.BytesIO(file_bytes))
|
|
elif filename.endswith((".xlsx", ".xls")):
|
|
df = pd.read_excel(io.BytesIO(file_bytes))
|
|
else:
|
|
return None, ["Format non supporté. Utilisez CSV ou XLSX."]
|
|
except Exception as e:
|
|
return None, [f"Erreur de lecture du fichier: {e}"]
|
|
|
|
# Normalize column names
|
|
df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns]
|
|
|
|
missing = REQUIRED_COLUMNS - set(df.columns)
|
|
if missing:
|
|
return None, [f"Colonnes manquantes: {', '.join(missing)}"]
|
|
|
|
# Validate rows
|
|
for idx, row in df.iterrows():
|
|
line = idx + 2 # Excel line number (1-indexed + header)
|
|
status = str(row["status"]).strip().upper()
|
|
if status not in VALID_STATUSES:
|
|
errors.append(f"Ligne {line}: statut '{row['status']}' invalide (attendu: RS, RP, PRO)")
|
|
|
|
try:
|
|
vol = float(row["volume_m3"])
|
|
if vol < 0:
|
|
errors.append(f"Ligne {line}: volume négatif ({vol})")
|
|
except (ValueError, TypeError):
|
|
errors.append(f"Ligne {line}: volume invalide '{row['volume_m3']}'")
|
|
|
|
price = row.get("price_eur")
|
|
if pd.notna(price):
|
|
try:
|
|
p = float(price)
|
|
if p < 0:
|
|
errors.append(f"Ligne {line}: prix négatif ({p})")
|
|
except (ValueError, TypeError):
|
|
errors.append(f"Ligne {line}: prix invalide '{price}'")
|
|
|
|
# Normalize
|
|
df["status"] = df["status"].str.strip().str.upper()
|
|
df["identifier"] = df["identifier"].astype(str).str.strip()
|
|
|
|
return df, errors
|
|
|
|
|
|
async def import_households(
|
|
db: AsyncSession,
|
|
commune_id: int,
|
|
df: pd.DataFrame,
|
|
) -> tuple[int, list[str]]:
|
|
"""
|
|
Import validated households into the database.
|
|
|
|
Returns (created_count, errors).
|
|
"""
|
|
created = 0
|
|
errors = []
|
|
|
|
# Get existing auth codes to avoid collisions
|
|
existing_codes = set()
|
|
result = await db.execute(select(Household.auth_code))
|
|
for row in result.scalars():
|
|
existing_codes.add(row)
|
|
|
|
for idx, row in df.iterrows():
|
|
identifier = str(row["identifier"]).strip()
|
|
status = str(row["status"]).strip().upper()
|
|
volume = float(row["volume_m3"])
|
|
price = float(row["price_eur"]) if pd.notna(row.get("price_eur")) else 0.0
|
|
|
|
# Check for duplicate
|
|
existing = await db.execute(
|
|
select(Household).where(
|
|
Household.commune_id == commune_id,
|
|
Household.identifier == identifier,
|
|
)
|
|
)
|
|
if existing.scalar_one_or_none():
|
|
errors.append(f"Foyer '{identifier}' existe déjà, ignoré.")
|
|
continue
|
|
|
|
# Generate unique auth code
|
|
code = generate_auth_code()
|
|
while code in existing_codes:
|
|
code = generate_auth_code()
|
|
existing_codes.add(code)
|
|
|
|
household = Household(
|
|
commune_id=commune_id,
|
|
identifier=identifier,
|
|
status=status,
|
|
volume_m3=volume,
|
|
price_paid_eur=price,
|
|
auth_code=code,
|
|
)
|
|
db.add(household)
|
|
created += 1
|
|
|
|
await db.commit()
|
|
return created, errors
|
|
|
|
|
|
def generate_template_csv() -> bytes:
|
|
"""Generate a template CSV file for household import."""
|
|
content = "identifier,status,volume_m3,price_eur\n"
|
|
content += "DUPONT Jean,RS,85.5,189.50\n"
|
|
content += "MARTIN Pierre,RP,120.0,245.00\n"
|
|
content += "SARL Boulangerie,PRO,350.0,\n"
|
|
return content.encode("utf-8")
|