Initial commit: SejeteralO water tarification platform

Full-stack app for participatory water pricing using Bezier curves.
- Backend: FastAPI + SQLAlchemy + SQLite with JWT auth
- Frontend: Nuxt 4 + TypeScript with interactive SVG editor
- Math engine: cubic Bezier tarification with Cardano solver
- Admin: commune management, household import, vote monitoring, CMS
- Citizen: interactive curve editor, vote submission
- Docker-compose deployment ready

Includes fixes for:
- Impact table snake_case/camelCase property mismatch
- CMS content backend API + frontend editor (was stub)
- Admin route protection middleware
- Public content display on commune page
- Vote confirmation page link fix

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Yvv
2026-02-21 15:26:02 +01:00
commit b30e54a8f7
67 changed files with 16723 additions and 0 deletions

View File

@@ -0,0 +1,23 @@
from app.engine.integrals import compute_integrals
from app.engine.pricing import (
HouseholdData,
TariffResult,
compute_p0,
compute_tariff,
compute_impacts,
)
from app.engine.current_model import compute_linear_tariff, LinearTariffResult
from app.engine.median import VoteParams, compute_median
__all__ = [
"compute_integrals",
"HouseholdData",
"TariffResult",
"compute_p0",
"compute_tariff",
"compute_impacts",
"compute_linear_tariff",
"LinearTariffResult",
"VoteParams",
"compute_median",
]

View File

@@ -0,0 +1,66 @@
"""
Current (linear) pricing model.
Ported from eau.py:256-354 (CurrentModel).
Pure Python + numpy, no matplotlib.
"""
from dataclasses import dataclass
from app.engine.pricing import HouseholdData
@dataclass
class LinearTariffResult:
"""Result of the linear tariff computation."""
p0: float # flat price per m³
curve_volumes: list[float]
curve_bills_rp: list[float]
curve_bills_rs: list[float]
curve_price_m3_rp: list[float]
curve_price_m3_rs: list[float]
def compute_linear_tariff(
households: list[HouseholdData],
recettes: float,
abop: float,
abos: float,
vmax: float = 2100,
nbpts: int = 200,
) -> LinearTariffResult:
"""
Compute the linear (current) pricing model.
p0 = (recettes - Σ abo) / Σ volume
"""
total_abo = 0.0
total_volume = 0.0
for h in households:
abo = abos if h.status == "RS" else abop
total_abo += abo
total_volume += max(h.volume_m3, 1e-5)
if total_abo >= recettes or total_volume == 0:
p0 = 0.0
else:
p0 = (recettes - total_abo) / total_volume
# Generate curves
import numpy as np
vv = np.linspace(1e-5, vmax, nbpts)
bills_rp = abop + p0 * vv
bills_rs = abos + p0 * vv
price_m3_rp = abop / vv + p0
price_m3_rs = abos / vv + p0
return LinearTariffResult(
p0=p0,
curve_volumes=vv.tolist(),
curve_bills_rp=bills_rp.tolist(),
curve_bills_rs=bills_rs.tolist(),
curve_price_m3_rp=price_m3_rp.tolist(),
curve_price_m3_rs=price_m3_rs.tolist(),
)

View File

@@ -0,0 +1,118 @@
"""
Integral computation for Bézier tariff curves.
Ported from eau.py:169-211 (NewModel.computeIntegrals).
Pure Python + numpy, no matplotlib.
"""
import numpy as np
def compute_integrals(
volume: float,
vinf: float,
vmax: float,
pmax: float,
a: float,
b: float,
c: float,
d: float,
e: float,
) -> tuple[float, float, float]:
"""
Compute (alpha1, alpha2, beta2) for a given consumption volume.
The total bill for a household consuming `volume` m³ is:
bill = abo + alpha1 * p0 + alpha2 * p0 + beta2
where p0 is the inflection price (computed separately to balance revenue).
Args:
volume: consumption in m³ for this household
vinf: inflection volume separating the two tiers
vmax: maximum volume (price = pmax at this volume)
pmax: maximum price per m³
a, b: shape parameters for tier 1 Bézier curve
c, d, e: shape parameters for tier 2 Bézier curve
Returns:
(alpha1, alpha2, beta2) tuple
"""
if volume <= vinf:
# Tier 1 only
T = _solve_tier1_t(volume, vinf, b)
alpha1 = _compute_alpha1(T, vinf, a, b)
return alpha1, 0.0, 0.0
else:
# Full tier 1 (T=1) + partial tier 2
alpha1 = _compute_alpha1(1.0, vinf, a, b)
# Tier 2
wmax = vmax - vinf
T = _solve_tier2_t(volume - vinf, wmax, c, d)
uu = _compute_uu(T, c, d, e)
alpha2 = (volume - vinf) - 3 * uu * wmax
beta2 = 3 * pmax * wmax * uu
return alpha1, alpha2, beta2
def _solve_tier1_t(volume: float, vinf: float, b: float) -> float:
"""Find T such that v(T) = volume for tier 1."""
if volume == 0:
return 0.0
if volume >= vinf:
return 1.0
# Solve: vinf * [(1 - 3b) * T³ + 3b * T²] = volume
# => (1-3b) * T³ + 3b * T² - volume/vinf = 0
p = [1 - 3 * b, 3 * b, 0, -volume / vinf]
roots = np.roots(p)
roots = np.unique(roots)
real_roots = np.real(roots[np.isreal(roots)])
mask = (real_roots <= 1.0) & (real_roots >= 0.0)
return float(real_roots[mask][0])
def _solve_tier2_t(w: float, wmax: float, c: float, d: float) -> float:
"""Find T such that w(T) = w for tier 2, where w = volume - vinf."""
if w == 0:
return 0.0
if w >= wmax:
return 1.0
# Solve: wmax * [(3(c+d-cd)-2)*T³ + 3(1-2c-d+cd)*T² + 3c*T] = w
p = [
3 * (c + d - c * d) - 2,
3 * (1 - 2 * c - d + c * d),
3 * c,
-w / wmax,
]
roots = np.roots(p)
roots = np.unique(roots)
real_roots = np.real(roots[np.isreal(roots)])
mask = (real_roots <= 1.0 + 1e-10) & (real_roots >= -1e-10)
if not mask.any():
# Fallback: closest root to [0,1]
return float(np.clip(np.real(roots[0]), 0.0, 1.0))
return float(np.clip(real_roots[mask][0], 0.0, 1.0))
def _compute_alpha1(T: float, vinf: float, a: float, b: float) -> float:
"""Compute alpha1 coefficient for tier 1."""
return 3 * vinf * (
T**6 / 6 * (-9 * a * b + 3 * a + 6 * b - 2)
+ T**5 / 5 * (24 * a * b - 6 * a - 13 * b + 3)
+ 3 * T**4 / 4 * (-7 * a * b + a + 2 * b)
+ T**3 / 3 * 6 * a * b
)
def _compute_uu(T: float, c: float, d: float, e: float) -> float:
"""Compute the uu intermediate value for tier 2."""
return (
(-3 * c * d + 9 * e * c * d + 3 * c - 9 * e * c + 3 * d - 9 * e * d + 6 * e - 2) * T**6 / 6
+ (2 * c * d - 15 * e * c * d - 4 * c + 21 * e * c - 2 * d + 15 * e * d - 12 * e + 2) * T**5 / 5
+ (6 * e * c * d + c - 15 * e * c - 6 * e * d + 6 * e) * T**4 / 4
+ (3 * e * c) * T**3 / 3
)

View File

@@ -0,0 +1,48 @@
"""
Median computation for vote parameters.
Computes the element-wise median of (vinf, a, b, c, d, e) across all active votes.
This parametric median is chosen over geometric median because:
- It's transparent and politically explainable
- The result is itself a valid set of Bézier parameters
"""
import numpy as np
from dataclasses import dataclass
@dataclass
class VoteParams:
"""The 6 citizen-adjustable parameters."""
vinf: float
a: float
b: float
c: float
d: float
e: float
def compute_median(votes: list[VoteParams]) -> VoteParams | None:
"""
Compute element-wise median of vote parameters.
Returns None if no votes provided.
"""
if not votes:
return None
vinfs = [v.vinf for v in votes]
a_s = [v.a for v in votes]
b_s = [v.b for v in votes]
c_s = [v.c for v in votes]
d_s = [v.d for v in votes]
e_s = [v.e for v in votes]
return VoteParams(
vinf=float(np.median(vinfs)),
a=float(np.median(a_s)),
b=float(np.median(b_s)),
c=float(np.median(c_s)),
d=float(np.median(d_s)),
e=float(np.median(e_s)),
)

View File

@@ -0,0 +1,196 @@
"""
Pricing computation for Bézier tariff model.
Ported from eau.py:120-167 (NewModel.updateComputation).
Pure Python + numpy, no matplotlib.
"""
import numpy as np
from dataclasses import dataclass
from app.engine.integrals import compute_integrals
@dataclass
class HouseholdData:
"""Minimal household data needed for computation."""
volume_m3: float
status: str # "RS", "RP", or "PRO"
price_paid_eur: float = 0.0
@dataclass
class TariffResult:
"""Result of a full tariff computation."""
p0: float
curve_volumes: list[float]
curve_prices_m3: list[float]
curve_bills_rp: list[float]
curve_bills_rs: list[float]
household_bills: list[float] # projected bill for each household
@dataclass
class ImpactRow:
"""Price impact for a specific volume level."""
volume: float
old_price: float
new_price_rp: float
new_price_rs: float
def compute_p0(
households: list[HouseholdData],
recettes: float,
abop: float,
abos: float,
vinf: float,
vmax: float,
pmax: float,
a: float,
b: float,
c: float,
d: float,
e: float,
) -> float:
"""
Compute p0 (inflection price) that balances total revenue.
p0 = (R - Σ(abo + β₂)) / Σ(α₁ + α₂)
"""
total_abo = 0.0
total_alpha = 0.0
total_beta = 0.0
for h in households:
abo = abos if h.status == "RS" else abop
total_abo += abo
vol = max(h.volume_m3, 1e-5) # avoid div by 0
alpha1, alpha2, beta2 = compute_integrals(vol, vinf, vmax, pmax, a, b, c, d, e)
total_alpha += alpha1 + alpha2
total_beta += beta2
if total_abo >= recettes:
return 0.0
if total_alpha == 0:
return 0.0
return (recettes - total_abo - total_beta) / total_alpha
def compute_tariff(
households: list[HouseholdData],
recettes: float,
abop: float,
abos: float,
vinf: float,
vmax: float,
pmax: float,
a: float,
b: float,
c: float,
d: float,
e: float,
nbpts: int = 200,
) -> TariffResult:
"""
Full tariff computation: p0, price curves, and per-household bills.
"""
p0 = compute_p0(households, recettes, abop, abos, vinf, vmax, pmax, a, b, c, d, e)
# Generate curve points
tt = np.linspace(0, 1 - 1e-6, nbpts)
# Tier 1 volumes and prices
vv1 = vinf * ((1 - 3 * b) * tt**3 + 3 * b * tt**2)
prix_m3_1 = p0 * ((3 * a - 2) * tt**3 + (-6 * a + 3) * tt**2 + 3 * a * tt)
# Tier 2 volumes and prices
vv2 = vinf + (vmax - vinf) * (
(3 * (c + d - c * d) - 2) * tt**3
+ 3 * (1 - 2 * c - d + c * d) * tt**2
+ 3 * c * tt
)
prix_m3_2 = p0 + (pmax - p0) * ((1 - 3 * e) * tt**3 + 3 * e * tt**2)
vv = np.concatenate([vv1, vv2])
prix_m3 = np.concatenate([prix_m3_1, prix_m3_2])
# Compute full bills (integral) for each curve point
alpha1_arr = np.zeros(len(vv))
alpha2_arr = np.zeros(len(vv))
beta2_arr = np.zeros(len(vv))
for iv, v in enumerate(vv):
alpha1_arr[iv], alpha2_arr[iv], beta2_arr[iv] = compute_integrals(
v, vinf, vmax, pmax, a, b, c, d, e
)
bills_rp = abop + (alpha1_arr + alpha2_arr) * p0 + beta2_arr
bills_rs = abos + (alpha1_arr + alpha2_arr) * p0 + beta2_arr
# Per-household projected bills
household_bills = []
for h in households:
vol = max(h.volume_m3, 1e-5)
abo = abos if h.status == "RS" else abop
a1, a2, b2 = compute_integrals(vol, vinf, vmax, pmax, a, b, c, d, e)
household_bills.append(abo + (a1 + a2) * p0 + b2)
return TariffResult(
p0=p0,
curve_volumes=vv.tolist(),
curve_prices_m3=prix_m3.tolist(),
curve_bills_rp=bills_rp.tolist(),
curve_bills_rs=bills_rs.tolist(),
household_bills=household_bills,
)
def compute_impacts(
households: list[HouseholdData],
recettes: float,
abop: float,
abos: float,
vinf: float,
vmax: float,
pmax: float,
a: float,
b: float,
c: float,
d: float,
e: float,
reference_volumes: list[float] | None = None,
) -> tuple[float, list[ImpactRow]]:
"""
Compute p0 and price impacts for reference volume levels.
Returns (p0, list of ImpactRow).
"""
if reference_volumes is None:
reference_volumes = [30, 60, 90, 150, 300]
p0 = compute_p0(households, recettes, abop, abos, vinf, vmax, pmax, a, b, c, d, e)
# Compute average 2018 price per m³ for a rough "old price" baseline
total_vol = sum(max(h.volume_m3, 1e-5) for h in households)
total_abo_old = sum(abos if h.status == "RS" else abop for h in households)
old_p_m3 = (recettes - total_abo_old) / total_vol if total_vol > 0 else 0
impacts = []
for vol in reference_volumes:
# Old price (linear model)
old_price_rp = abop + old_p_m3 * vol
# New price
a1, a2, b2 = compute_integrals(vol, vinf, vmax, pmax, a, b, c, d, e)
new_price_rp = abop + (a1 + a2) * p0 + b2
new_price_rs = abos + (a1 + a2) * p0 + b2
impacts.append(ImpactRow(
volume=vol,
old_price=old_price_rp,
new_price_rp=new_price_rp,
new_price_rs=new_price_rs,
))
return p0, impacts