Files
sejeteralo/eau.py
Yvv b30e54a8f7 Initial commit: SejeteralO water tarification platform
Full-stack app for participatory water pricing using Bezier curves.
- Backend: FastAPI + SQLAlchemy + SQLite with JWT auth
- Frontend: Nuxt 4 + TypeScript with interactive SVG editor
- Math engine: cubic Bezier tarification with Cardano solver
- Admin: commune management, household import, vote monitoring, CMS
- Citizen: interactive curve editor, vote submission
- Docker-compose deployment ready

Includes fixes for:
- Impact table snake_case/camelCase property mismatch
- CMS content backend API + frontend editor (was stub)
- Admin route protection middleware
- Public content display on commune page
- Vote confirmation page link fix

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 15:26:02 +01:00

356 lines
17 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sat Jun 18 2022
Backend for water price computation
@author: alex delga
"""
import xlrd
import xlwt
import numpy as np
import matplotlib.pyplot as plt # main ploting module
from matplotlib.widgets import Slider
import pandas as pd
import seaborn as sns
sns.set_theme()
class NewModel():
def __init__(self,pmax=20,vmax=2100): # Set the static parameters
self.pmax=pmax #maximum price per m3 acceptable
self.vmax=vmax #consumed volume in m3 for which such price is reached. All inhabitants must be under.
self.nbpts=1023
self.tt=np.linspace(0,1-1e-6,self.nbpts) # the -1e-6 is to avoid pb at vv=vmax later
self.vv=np.zeros((2*self.nbpts,))#must be reset parametrically using tt everytime a,b,c,d,e,vinf are changed.
self.abopArray = np.linspace(0,200,201) # potential values for abop
self.abosArray = np.linspace(0,200,201) # potential values for abos
self.recettesArray = np.linspace (50000,100000,51) # potential values for recettes
self.vinfArray = np.linspace(0,vmax,vmax+1) # potential values for volume at inflexion point
self.aArray = np.linspace(0,1,51) # potential values for a
self.bArray = np.linspace(0,1,51) # potential values for b
self.cArray = np.linspace(0,1,51) # potential values for c
self.dArray = np.linspace(0,1,51) # potential values for d
self.eArray = np.linspace(0,1,51) # potential values for e
#loading the inhabitant data
book = xlrd.open_workbook('Eau2018.xls')
sheet = book.sheet_by_name('CALCULS')
self.nbHab=363
self.volume = np.array([sheet.cell_value(r,4) for r in range(1,self.nbHab+1)])#get the volume used in 2018 in m3
self.vTot=np.sum(self.volume)
self.volume[self.volume==0]=1e-5 #little trick to avoid divide by 0
self.status = np.array([sheet.cell_value(r,3) for r in range(1,self.nbHab+1)])#get the RS, RP, PRO status
self.prix2018 = np.array([sheet.cell_value(r,33) for r in range(1,self.nbHab+1)])#get the price paid in 2018 in EUR
self.prix2018_m3 = self.prix2018 / self.volume
def mainFunction(self):
'''
main function, the one called in the notebook, and that update computations depending on the slider values.
'''
# initial values
self.abop= self.abopArray[100]
self.abos= self.abosArray[100]
self.recettes= self.recettesArray[25]
self.vinf=self.vinfArray[int(self.vmax/2)]
self.a=self.aArray[25]
self.b=self.bArray[25]
self.c=self.cArray[25]
self.d=self.dArray[25]
self.e=self.eArray[25]
# initial plot
self.updateComputation()
self.fig = plt.figure(figsize=(9,6))
self.ax1 = plt.subplot(211)
self.ax2 = plt.subplot(212,sharex=self.ax1)
#self.ax1.set_xlabel(r'volume ($m^3$) ')
self.ax2.set_xlabel(r'volume ($m^3$) ')
self.ax1.set_ylabel('Facture totale (€)')
self.ax2.set_ylabel('Prix au m3 (€)')
#default values of ax limits
self.axx=(min(self.vv),max(self.vv))
self.ax1y=(0,max(self.prixp))
self.ax2y=(0,self.pmax)
self.updatePlot()
# SLIDERS
axcolor = 'lightgoldenrodyellow'
abopAxis = plt.axes([0.7, 0.70, 0.2, 0.03], facecolor=axcolor)
self.abopSlider = Slider(abopAxis, r'Abo. P/PRO (€)', np.min(self.abopArray), np.max(self.abopArray), self.abop)
self.abopSlider.on_changed(self.updateParam)
abosAxis = plt.axes([0.7, 0.65, 0.2, 0.03], facecolor=axcolor)
self.abosSlider = Slider(abosAxis, r'Abo. S (€)', np.min(self.abosArray), np.max(self.abosArray), self.abos)
self.abosSlider.on_changed(self.updateParam)
recettesAxis = plt.axes([0.7, 0.60, 0.2, 0.03], facecolor=axcolor)
self.recettesSlider = Slider(recettesAxis, r'Recettes (€)', np.min(self.recettesArray), np.max(self.recettesArray), self.recettes)
self.recettesSlider.on_changed(self.updateParam)
vinfAxis = plt.axes([0.7, 0.55, 0.2, 0.03], facecolor=axcolor)
self.vinfSlider = Slider(vinfAxis, r'$v_{0}$', np.min(self.vinfArray), np.max(self.vinfArray), self.vinf)
self.vinfSlider.on_changed(self.updateParam)
aAxis = plt.axes([0.7, 0.5, 0.2, 0.03], facecolor=axcolor)
self.aSlider = Slider(aAxis, r'a', np.min(self.aArray), np.max(self.aArray), self.a)
self.aSlider.on_changed(self.updateParam)
bAxis = plt.axes([0.7, 0.45, 0.2, 0.03], facecolor=axcolor)
self.bSlider = Slider(bAxis, r'b', np.min(self.bArray), np.max(self.bArray), self.b)
self.bSlider.on_changed(self.updateParam)
cAxis = plt.axes([0.7, 0.4, 0.2, 0.03], facecolor=axcolor)
self.cSlider = Slider(cAxis, r'c', np.min(self.cArray), np.max(self.cArray), self.c)
self.cSlider.on_changed(self.updateParam)
dAxis = plt.axes([0.7, 0.35, 0.2, 0.03], facecolor=axcolor)
self.dSlider = Slider(dAxis, r'd', np.min(self.dArray), np.max(self.dArray), self.dArray[25])
self.dSlider.on_changed(self.updateParam)
eAxis = plt.axes([0.7, 0.3, 0.2, 0.03], facecolor=axcolor)
self.eSlider = Slider(eAxis, r'e', np.min(self.eArray), np.max(self.eArray), self.eArray[25])
self.eSlider.on_changed(self.updateParam)
def updateComputation(self): # The computation, parameter dependant
#first deal with abonnements
mask= self.status=='RS'
self.abo= self.abop*np.ones((self.nbHab,))
self.abo[self.status=='RS']=self.abos
#second deal with consumption and total prices
self.prixConso=np.zeros(self.vv.shape)
self.prixp=np.zeros(self.vv.shape)
self.prixs=np.zeros(self.vv.shape)
if np.sum(self.abo)>=self.recettes:
self.p0=0
self.prixp = self.abop
self.prixs = self.abos
else:
#find p0 to balance income.
alpha1=np.zeros((self.nbHab,))# préfacteur de p_0 dans la tranche 1
alpha2=np.zeros((self.nbHab,))# préfacteur de p_0 dans la tranche 2
beta2=np.zeros((self.nbHab,))# constante dans la tranche 2
#compute consumption integral on all inhabitants
for ih in range(self.nbHab):
alpha1[ih],alpha2[ih],beta2[ih]=self.computeIntegrals(self.volume[ih])
self.p0=(self.recettes-np.sum(beta2+self.abo))/np.sum(alpha1+alpha2) # prix d'inflexion
self.prixProj = (alpha1+alpha2)*self.p0+beta2 #Calcul du prix payé par les habitantes dans le nouveau scénario
#compute consumption price per m3
#tranche 1
self.vv[0:self.nbpts]=self.vinf*((1-3*self.b)*(self.tt**3)+3*self.b*(self.tt**2))
self.prixConso[0:self.nbpts]=self.p0*((3*self.a-2)*(self.tt**3)+(-6*self.a+3)*(self.tt**2)+3*self.a*self.tt)
#tranche 2
self.vv[self.nbpts:]=self.vinf+(self.vmax-self.vinf)*((3*(self.c+self.d-self.c*self.d)-2)*(self.tt**3)+\
3*(1-2*self.c-self.d+self.c*self.d)*(self.tt**2)+3*self.c*self.tt)
self.prixConso[self.nbpts:]=self.p0+(self.pmax-self.p0)*((1-3*self.e)*(self.tt**3)+3*self.e*(self.tt**2))
#compute full consumption price (integral of price per m3)
alpha1=np.zeros(self.vv.shape)# préfacteur de p_0 dans la tranche 1
alpha2=np.zeros(self.vv.shape)# préfacteur de p_0 dans la tranche 2
beta2=np.zeros(self.vv.shape)# constante dans la tranche 2
for iv,vv in enumerate(self.vv):
alpha1[iv],alpha2[iv],beta2[iv]=self.computeIntegrals(vv)
self.prixp = self.abop+(alpha1+alpha2)*self.p0+beta2
self.prixs = self.abos+(alpha1+alpha2)*self.p0+beta2
#now compute prices
self.prixp_m3 = self.prixp/self.vv
self.prixs_m3 = self.prixs/self.vv
def computeIntegrals(self,vv):#return alpha1, alpha2 and beta for an input volume
a=self.a
b=self.b
c=self.c
d=self.d
e=self.e
if vv<=self.vinf:# on ne travaille que dans la tranche 1.
#Find the value of T by solving the equation V=v(T)
if vv==0: #remove extrema points where root finding can be problematic
T=0.0
elif vv==self.vinf:
T=1.0
else:
p = [1-3*b, 3*b, 0, -vv/self.vinf]# polynomial representation of the equation
roots = np.roots(p)
roots = np.unique(roots)#remove duplicates
roots2 = np.real(roots[np.isreal(roots)]) # take only real roots
mask = (roots2<=1.) & (roots2>=0.)#check that T is real and between 0 and 1
T=float(roots2[mask])
alpha1=3*self.vinf*(T**6/6*(-9*a*b+3*a+6*b-2)+T**5/5*(24*a*b-6*a-13*b+3)+3*T**4/4*(-7*a*b+a+2*b)+T**3/3*6*a*b)
return alpha1,0,0
else:
alpha1=3*self.vinf*(1/6*(-9*a*b+3*a+6*b-2)+1/5*(24*a*b-6*a-13*b+3)+3/4*(-7*a*b+a+2*b)+1/3*6*a*b)# tranche 1 avec T=1
#tranche 2
wmax=self.vmax-self.vinf
#Find the value of T by solving the equation V-vinf=w(T)
if vv==self.vinf: #remove extrema points where root finding can be problematic
T=0.0
elif vv==self.vmax:
T=1.0
else:
p = [3*(c+d-c*d)-2, 3*(1-2*c-d+c*d), 3*c, -(vv-self.vinf)/wmax]
roots = np.roots(p)
roots = np.unique(roots)#remove duplicates
roots2 = np.real(roots[np.isreal(roots)]) # take only real roots
mask = (roots2<=1.) & (roots2>=0.)#check that T is real and between 0 and 1#
if not mask.any(): # for a weird reason, if vv=vmax, we have roots2=1.0 but it does not pass roots2<=1.0 ????
print(vv,roots,roots2,mask)
T=float(np.real(roots2[mask]))
uu=(-3*c*d+9*e*c*d+3*c-9*e*c+3*d-9*e*d+6*e -2)*T**6/6+(2*c*d-15*e*c*d-4*c+21*e*c-2*d+15*e*d-12*e+2)*T**5/5 +(6*e*c*d+c-15*e*c-6*e*d+6*e)*T**4/4+(3*e*c)*T**3/3
alpha2=vv-self.vinf-3*uu*wmax
beta2=3*self.pmax*wmax*uu
return alpha1,alpha2,beta2
def updatePlot(self): # The plot update
self.ax1.plot(self.vv,self.prixp,label='RP/PRO')
self.ax1.plot(self.vv,self.prixs,label='RS')
self.ax1.scatter(self.volume,self.prix2018,s=3,color='g',label='2018')
self.ax2.plot(self.vv,self.prixp_m3,label='RP/PRO')
self.ax2.plot(self.vv,self.prixs_m3,label='RS')
self.ax2.plot(self.vv,self.prixConso,'k-',label='Conso')
self.ax2.plot([self.vinf,self.vinf],[0,self.p0],color = '0.75')
self.ax2.plot([0,self.vinf],[self.p0,self.p0],color = '0.75')
self.ax2.scatter(self.volume,self.prix2018_m3,s=3,color='g',label='2018')
self.ax1.set_xlim(self.axx)
self.ax1.set_ylim(self.ax1y)
self.ax2.set_ylim(self.ax2y)
#self.ax2.text(0.9, 0.2, '$p_0=${:.2f} €'.format(self.p0),ha='right', va='bottom', transform=self.ax2.transAxes)
self.ax2.legend()
self.ax2.set_xlabel(r'volume ($m^3$) ')
self.ax1.set_ylabel('Facture totale (€)')
self.ax2.set_ylabel('Prix au m3 (€)')
plt.tight_layout()
plt.subplots_adjust(top = 0.9,right = 0.5)
def updateParam(self,val): # Update the parameter using the slider values
#get current values of ax limits
self.axx=self.ax1.get_xlim()
self.ax1y=self.ax1.get_ylim()
self.ax2y=self.ax2.get_ylim()
self.ax1.clear()
self.ax2.clear()
self.abop= self.abopSlider.val
self.abos= self.abosSlider.val
self.recettes= self.recettesSlider.val
self.vinf=self.vinfSlider.val
self.a=self.aSlider.val
self.b=self.bSlider.val
self.c=self.cSlider.val
self.d=self.dSlider.val
self.e=self.eSlider.val
self.updateComputation()
self.updatePlot() #
class CurrentModel():
def __init__(self): # Set the static parameters
self.vv = np.linspace(0,2100,2101) # volume values
self.vv[0]=1e-5 #little trick to avoid divide by 0
self.abopArray = np.linspace(0,200,201) # potential values for abop
self.abosArray = np.linspace(0,200,201) # potential values for abos
self.recettesArray = np.linspace (50000,100000,51) # potential values for recettes
#loading the inhabitant data
book = xlrd.open_workbook('Eau2018.xls')
sheet = book.sheet_by_name('CALCULS')
self.nbHab=363
self.volume = np.array([sheet.cell_value(r,4) for r in range(1,self.nbHab+1)])#get the volume used in 2018 in m3
self.vTot=np.sum(self.volume)
self.volume[self.volume==0]=1e-5 #little trick to avoid divide by 0
self.status = np.array([sheet.cell_value(r,3) for r in range(1,self.nbHab+1)])#get the RS, RP, PRO status
self.prix2018 = np.array([sheet.cell_value(r,33) for r in range(1,self.nbHab+1)])#get the price paid in 2018 in EUR
self.prix2018_m3 = self.prix2018 / self.volume
def mainFunction(self):
'''
main function, the one called in the notebook, and that update computations depending on the slider values.
'''
self.fig = plt.figure(figsize=(8,5))
self.ax1 = plt.subplot(121)
self.ax2 = plt.subplot(122,sharex=self.ax1)
self.ax1.set_xlabel(r'volume ($m^3$) ')
self.ax2.set_xlabel(r'volume ($m^3$) ')
self.ax1.set_ylabel('Facture totale (€)')
self.ax2.set_ylabel('Prix au m3 (€)')
# initial values
self.abop= self.abopArray[100]
self.abos= self.abosArray[100]
self.recettes= self.recettesArray[25]
# initial plot
self.updateComputation()
self.updatePlot()
# SLIDERS
axcolor = 'lightgoldenrodyellow'
abopAxis = plt.axes([0.2, 0.15, 0.65, 0.05], facecolor=axcolor)
self.abopSlider = Slider(abopAxis, r'Abo. P/PRO (EUR)', np.min(self.abopArray), np.max(self.abopArray), self.abopArray[100])
self.abopSlider.on_changed(self.updateParam)
abosAxis = plt.axes([0.2, 0.1, 0.65, 0.05], facecolor=axcolor)
self.abosSlider = Slider(abosAxis, r'Abo. S (EUR)', np.min(self.abosArray), np.max(self.abosArray), self.abosArray[100])
self.abosSlider.on_changed(self.updateParam)
recettesAxis = plt.axes([0.2, 0.05, 0.65, 0.05], facecolor=axcolor)
self.recettesSlider = Slider(recettesAxis, r'Recettes (EUR)', np.min(self.recettesArray), np.max(self.recettesArray), self.recettesArray[25])
self.recettesSlider.on_changed(self.updateParam)
def updateComputation(self): # The computation, parameter dependant
#first deal with abonnements
mask= self.status=='RS'
self.abo= self.abop*np.ones((self.nbHab,))
self.abo[self.status=='RS']=self.abos
#second deal with consumption
if np.sum(self.abo)>=self.recettes:
self.p0=0
else:
self.p0= (self.recettes-np.sum(self.abo))/self.vTot
#now deal with prices
self.prixp = self.abop+self.p0*self.vv
self.prixs = self.abos+self.p0*self.vv
self.prixp_m3 = self.abop/self.vv+self.p0
self.prixs_m3 = self.abos/self.vv+self.p0
def updatePlot(self): # The plot update
self.ax1.plot(self.vv,self.prixp,label='RP/PRO')
self.ax1.plot(self.vv,self.prixs,label='RS')
self.ax1.scatter(self.volume,self.prix2018,s=3,color='g',label='2018')
self.ax2.plot(self.vv,self.prixp_m3,label='RP/PRO')
self.ax2.plot(self.vv,self.prixs_m3,label='RS')
self.ax2.scatter(self.volume,self.prix2018_m3,s=3,color='g',label='2018')
self.ax2.set_ylim(0,20)
self.ax2.text(0.9, 0.2, '$p_0=${:.2f}'.format(self.p0),ha='right', va='bottom', transform=self.ax2.transAxes)
self.ax2.legend()
self.ax1.set_xlabel(r'volume ($m^3$) ')
self.ax2.set_xlabel(r'volume ($m^3$) ')
self.ax1.set_ylabel('Facture totale (€)')
self.ax2.set_ylabel('Prix au m3 (€)')
plt.tight_layout()
plt.subplots_adjust(top = 0.9,bottom = 0.35)
def updateParam(self,val): # Update the parameter using the slider values
self.ax1.clear()
self.ax2.clear()
self.abop= self.abopSlider.val
self.abos= self.abosSlider.val
self.recettes= self.recettesSlider.val
self.updateComputation()
self.updatePlot() #