Full-stack app for participatory water pricing using Bezier curves. - Backend: FastAPI + SQLAlchemy + SQLite with JWT auth - Frontend: Nuxt 4 + TypeScript with interactive SVG editor - Math engine: cubic Bezier tarification with Cardano solver - Admin: commune management, household import, vote monitoring, CMS - Citizen: interactive curve editor, vote submission - Docker-compose deployment ready Includes fixes for: - Impact table snake_case/camelCase property mismatch - CMS content backend API + frontend editor (was stub) - Admin route protection middleware - Public content display on commune page - Vote confirmation page link fix Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
356 lines
17 KiB
Python
356 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Created on Sat Jun 18 2022
|
|
Backend for water price computation
|
|
@author: alex delga
|
|
"""
|
|
|
|
import xlrd
|
|
import xlwt
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt # main ploting module
|
|
from matplotlib.widgets import Slider
|
|
import pandas as pd
|
|
import seaborn as sns
|
|
sns.set_theme()
|
|
|
|
class NewModel():
|
|
def __init__(self,pmax=20,vmax=2100): # Set the static parameters
|
|
self.pmax=pmax #maximum price per m3 acceptable
|
|
self.vmax=vmax #consumed volume in m3 for which such price is reached. All inhabitants must be under.
|
|
|
|
self.nbpts=1023
|
|
self.tt=np.linspace(0,1-1e-6,self.nbpts) # the -1e-6 is to avoid pb at vv=vmax later
|
|
self.vv=np.zeros((2*self.nbpts,))#must be reset parametrically using tt everytime a,b,c,d,e,vinf are changed.
|
|
|
|
self.abopArray = np.linspace(0,200,201) # potential values for abop
|
|
self.abosArray = np.linspace(0,200,201) # potential values for abos
|
|
self.recettesArray = np.linspace (50000,100000,51) # potential values for recettes
|
|
self.vinfArray = np.linspace(0,vmax,vmax+1) # potential values for volume at inflexion point
|
|
self.aArray = np.linspace(0,1,51) # potential values for a
|
|
self.bArray = np.linspace(0,1,51) # potential values for b
|
|
self.cArray = np.linspace(0,1,51) # potential values for c
|
|
self.dArray = np.linspace(0,1,51) # potential values for d
|
|
self.eArray = np.linspace(0,1,51) # potential values for e
|
|
|
|
#loading the inhabitant data
|
|
book = xlrd.open_workbook('Eau2018.xls')
|
|
sheet = book.sheet_by_name('CALCULS')
|
|
self.nbHab=363
|
|
self.volume = np.array([sheet.cell_value(r,4) for r in range(1,self.nbHab+1)])#get the volume used in 2018 in m3
|
|
self.vTot=np.sum(self.volume)
|
|
self.volume[self.volume==0]=1e-5 #little trick to avoid divide by 0
|
|
self.status = np.array([sheet.cell_value(r,3) for r in range(1,self.nbHab+1)])#get the RS, RP, PRO status
|
|
self.prix2018 = np.array([sheet.cell_value(r,33) for r in range(1,self.nbHab+1)])#get the price paid in 2018 in EUR
|
|
self.prix2018_m3 = self.prix2018 / self.volume
|
|
|
|
|
|
def mainFunction(self):
|
|
'''
|
|
main function, the one called in the notebook, and that update computations depending on the slider values.
|
|
'''
|
|
# initial values
|
|
self.abop= self.abopArray[100]
|
|
self.abos= self.abosArray[100]
|
|
self.recettes= self.recettesArray[25]
|
|
self.vinf=self.vinfArray[int(self.vmax/2)]
|
|
self.a=self.aArray[25]
|
|
self.b=self.bArray[25]
|
|
self.c=self.cArray[25]
|
|
self.d=self.dArray[25]
|
|
self.e=self.eArray[25]
|
|
|
|
# initial plot
|
|
self.updateComputation()
|
|
|
|
self.fig = plt.figure(figsize=(9,6))
|
|
self.ax1 = plt.subplot(211)
|
|
self.ax2 = plt.subplot(212,sharex=self.ax1)
|
|
#self.ax1.set_xlabel(r'volume ($m^3$) ')
|
|
self.ax2.set_xlabel(r'volume ($m^3$) ')
|
|
self.ax1.set_ylabel('Facture totale (€)')
|
|
self.ax2.set_ylabel('Prix au m3 (€)')
|
|
|
|
#default values of ax limits
|
|
self.axx=(min(self.vv),max(self.vv))
|
|
self.ax1y=(0,max(self.prixp))
|
|
self.ax2y=(0,self.pmax)
|
|
|
|
self.updatePlot()
|
|
|
|
# SLIDERS
|
|
axcolor = 'lightgoldenrodyellow'
|
|
abopAxis = plt.axes([0.7, 0.70, 0.2, 0.03], facecolor=axcolor)
|
|
self.abopSlider = Slider(abopAxis, r'Abo. P/PRO (€)', np.min(self.abopArray), np.max(self.abopArray), self.abop)
|
|
self.abopSlider.on_changed(self.updateParam)
|
|
|
|
abosAxis = plt.axes([0.7, 0.65, 0.2, 0.03], facecolor=axcolor)
|
|
self.abosSlider = Slider(abosAxis, r'Abo. S (€)', np.min(self.abosArray), np.max(self.abosArray), self.abos)
|
|
self.abosSlider.on_changed(self.updateParam)
|
|
|
|
recettesAxis = plt.axes([0.7, 0.60, 0.2, 0.03], facecolor=axcolor)
|
|
self.recettesSlider = Slider(recettesAxis, r'Recettes (€)', np.min(self.recettesArray), np.max(self.recettesArray), self.recettes)
|
|
self.recettesSlider.on_changed(self.updateParam)
|
|
|
|
vinfAxis = plt.axes([0.7, 0.55, 0.2, 0.03], facecolor=axcolor)
|
|
self.vinfSlider = Slider(vinfAxis, r'$v_{0}$', np.min(self.vinfArray), np.max(self.vinfArray), self.vinf)
|
|
self.vinfSlider.on_changed(self.updateParam)
|
|
|
|
aAxis = plt.axes([0.7, 0.5, 0.2, 0.03], facecolor=axcolor)
|
|
self.aSlider = Slider(aAxis, r'a', np.min(self.aArray), np.max(self.aArray), self.a)
|
|
self.aSlider.on_changed(self.updateParam)
|
|
|
|
bAxis = plt.axes([0.7, 0.45, 0.2, 0.03], facecolor=axcolor)
|
|
self.bSlider = Slider(bAxis, r'b', np.min(self.bArray), np.max(self.bArray), self.b)
|
|
self.bSlider.on_changed(self.updateParam)
|
|
|
|
cAxis = plt.axes([0.7, 0.4, 0.2, 0.03], facecolor=axcolor)
|
|
self.cSlider = Slider(cAxis, r'c', np.min(self.cArray), np.max(self.cArray), self.c)
|
|
self.cSlider.on_changed(self.updateParam)
|
|
|
|
dAxis = plt.axes([0.7, 0.35, 0.2, 0.03], facecolor=axcolor)
|
|
self.dSlider = Slider(dAxis, r'd', np.min(self.dArray), np.max(self.dArray), self.dArray[25])
|
|
self.dSlider.on_changed(self.updateParam)
|
|
|
|
eAxis = plt.axes([0.7, 0.3, 0.2, 0.03], facecolor=axcolor)
|
|
self.eSlider = Slider(eAxis, r'e', np.min(self.eArray), np.max(self.eArray), self.eArray[25])
|
|
self.eSlider.on_changed(self.updateParam)
|
|
|
|
def updateComputation(self): # The computation, parameter dependant
|
|
#first deal with abonnements
|
|
mask= self.status=='RS'
|
|
self.abo= self.abop*np.ones((self.nbHab,))
|
|
self.abo[self.status=='RS']=self.abos
|
|
|
|
#second deal with consumption and total prices
|
|
self.prixConso=np.zeros(self.vv.shape)
|
|
self.prixp=np.zeros(self.vv.shape)
|
|
self.prixs=np.zeros(self.vv.shape)
|
|
if np.sum(self.abo)>=self.recettes:
|
|
self.p0=0
|
|
self.prixp = self.abop
|
|
self.prixs = self.abos
|
|
else:
|
|
#find p0 to balance income.
|
|
alpha1=np.zeros((self.nbHab,))# préfacteur de p_0 dans la tranche 1
|
|
alpha2=np.zeros((self.nbHab,))# préfacteur de p_0 dans la tranche 2
|
|
beta2=np.zeros((self.nbHab,))# constante dans la tranche 2
|
|
|
|
#compute consumption integral on all inhabitants
|
|
for ih in range(self.nbHab):
|
|
alpha1[ih],alpha2[ih],beta2[ih]=self.computeIntegrals(self.volume[ih])
|
|
|
|
self.p0=(self.recettes-np.sum(beta2+self.abo))/np.sum(alpha1+alpha2) # prix d'inflexion
|
|
self.prixProj = (alpha1+alpha2)*self.p0+beta2 #Calcul du prix payé par les habitantes dans le nouveau scénario
|
|
|
|
#compute consumption price per m3
|
|
#tranche 1
|
|
self.vv[0:self.nbpts]=self.vinf*((1-3*self.b)*(self.tt**3)+3*self.b*(self.tt**2))
|
|
self.prixConso[0:self.nbpts]=self.p0*((3*self.a-2)*(self.tt**3)+(-6*self.a+3)*(self.tt**2)+3*self.a*self.tt)
|
|
#tranche 2
|
|
self.vv[self.nbpts:]=self.vinf+(self.vmax-self.vinf)*((3*(self.c+self.d-self.c*self.d)-2)*(self.tt**3)+\
|
|
3*(1-2*self.c-self.d+self.c*self.d)*(self.tt**2)+3*self.c*self.tt)
|
|
self.prixConso[self.nbpts:]=self.p0+(self.pmax-self.p0)*((1-3*self.e)*(self.tt**3)+3*self.e*(self.tt**2))
|
|
|
|
#compute full consumption price (integral of price per m3)
|
|
alpha1=np.zeros(self.vv.shape)# préfacteur de p_0 dans la tranche 1
|
|
alpha2=np.zeros(self.vv.shape)# préfacteur de p_0 dans la tranche 2
|
|
beta2=np.zeros(self.vv.shape)# constante dans la tranche 2
|
|
for iv,vv in enumerate(self.vv):
|
|
alpha1[iv],alpha2[iv],beta2[iv]=self.computeIntegrals(vv)
|
|
self.prixp = self.abop+(alpha1+alpha2)*self.p0+beta2
|
|
self.prixs = self.abos+(alpha1+alpha2)*self.p0+beta2
|
|
|
|
#now compute prices
|
|
self.prixp_m3 = self.prixp/self.vv
|
|
self.prixs_m3 = self.prixs/self.vv
|
|
|
|
def computeIntegrals(self,vv):#return alpha1, alpha2 and beta for an input volume
|
|
a=self.a
|
|
b=self.b
|
|
c=self.c
|
|
d=self.d
|
|
e=self.e
|
|
if vv<=self.vinf:# on ne travaille que dans la tranche 1.
|
|
#Find the value of T by solving the equation V=v(T)
|
|
if vv==0: #remove extrema points where root finding can be problematic
|
|
T=0.0
|
|
elif vv==self.vinf:
|
|
T=1.0
|
|
else:
|
|
p = [1-3*b, 3*b, 0, -vv/self.vinf]# polynomial representation of the equation
|
|
roots = np.roots(p)
|
|
roots = np.unique(roots)#remove duplicates
|
|
roots2 = np.real(roots[np.isreal(roots)]) # take only real roots
|
|
mask = (roots2<=1.) & (roots2>=0.)#check that T is real and between 0 and 1
|
|
T=float(roots2[mask])
|
|
alpha1=3*self.vinf*(T**6/6*(-9*a*b+3*a+6*b-2)+T**5/5*(24*a*b-6*a-13*b+3)+3*T**4/4*(-7*a*b+a+2*b)+T**3/3*6*a*b)
|
|
return alpha1,0,0
|
|
else:
|
|
alpha1=3*self.vinf*(1/6*(-9*a*b+3*a+6*b-2)+1/5*(24*a*b-6*a-13*b+3)+3/4*(-7*a*b+a+2*b)+1/3*6*a*b)# tranche 1 avec T=1
|
|
#tranche 2
|
|
wmax=self.vmax-self.vinf
|
|
#Find the value of T by solving the equation V-vinf=w(T)
|
|
if vv==self.vinf: #remove extrema points where root finding can be problematic
|
|
T=0.0
|
|
elif vv==self.vmax:
|
|
T=1.0
|
|
else:
|
|
p = [3*(c+d-c*d)-2, 3*(1-2*c-d+c*d), 3*c, -(vv-self.vinf)/wmax]
|
|
roots = np.roots(p)
|
|
roots = np.unique(roots)#remove duplicates
|
|
roots2 = np.real(roots[np.isreal(roots)]) # take only real roots
|
|
mask = (roots2<=1.) & (roots2>=0.)#check that T is real and between 0 and 1#
|
|
if not mask.any(): # for a weird reason, if vv=vmax, we have roots2=1.0 but it does not pass roots2<=1.0 ????
|
|
print(vv,roots,roots2,mask)
|
|
T=float(np.real(roots2[mask]))
|
|
uu=(-3*c*d+9*e*c*d+3*c-9*e*c+3*d-9*e*d+6*e -2)*T**6/6+(2*c*d-15*e*c*d-4*c+21*e*c-2*d+15*e*d-12*e+2)*T**5/5 +(6*e*c*d+c-15*e*c-6*e*d+6*e)*T**4/4+(3*e*c)*T**3/3
|
|
alpha2=vv-self.vinf-3*uu*wmax
|
|
beta2=3*self.pmax*wmax*uu
|
|
return alpha1,alpha2,beta2
|
|
|
|
def updatePlot(self): # The plot update
|
|
self.ax1.plot(self.vv,self.prixp,label='RP/PRO')
|
|
self.ax1.plot(self.vv,self.prixs,label='RS')
|
|
self.ax1.scatter(self.volume,self.prix2018,s=3,color='g',label='2018')
|
|
self.ax2.plot(self.vv,self.prixp_m3,label='RP/PRO')
|
|
self.ax2.plot(self.vv,self.prixs_m3,label='RS')
|
|
self.ax2.plot(self.vv,self.prixConso,'k-',label='Conso')
|
|
self.ax2.plot([self.vinf,self.vinf],[0,self.p0],color = '0.75')
|
|
self.ax2.plot([0,self.vinf],[self.p0,self.p0],color = '0.75')
|
|
self.ax2.scatter(self.volume,self.prix2018_m3,s=3,color='g',label='2018')
|
|
|
|
self.ax1.set_xlim(self.axx)
|
|
self.ax1.set_ylim(self.ax1y)
|
|
self.ax2.set_ylim(self.ax2y)
|
|
#self.ax2.text(0.9, 0.2, '$p_0=${:.2f} €'.format(self.p0),ha='right', va='bottom', transform=self.ax2.transAxes)
|
|
self.ax2.legend()
|
|
self.ax2.set_xlabel(r'volume ($m^3$) ')
|
|
self.ax1.set_ylabel('Facture totale (€)')
|
|
self.ax2.set_ylabel('Prix au m3 (€)')
|
|
plt.tight_layout()
|
|
plt.subplots_adjust(top = 0.9,right = 0.5)
|
|
|
|
def updateParam(self,val): # Update the parameter using the slider values
|
|
#get current values of ax limits
|
|
self.axx=self.ax1.get_xlim()
|
|
self.ax1y=self.ax1.get_ylim()
|
|
self.ax2y=self.ax2.get_ylim()
|
|
self.ax1.clear()
|
|
self.ax2.clear()
|
|
self.abop= self.abopSlider.val
|
|
self.abos= self.abosSlider.val
|
|
self.recettes= self.recettesSlider.val
|
|
self.vinf=self.vinfSlider.val
|
|
self.a=self.aSlider.val
|
|
self.b=self.bSlider.val
|
|
self.c=self.cSlider.val
|
|
self.d=self.dSlider.val
|
|
self.e=self.eSlider.val
|
|
|
|
self.updateComputation()
|
|
self.updatePlot() #
|
|
|
|
|
|
class CurrentModel():
|
|
def __init__(self): # Set the static parameters
|
|
|
|
self.vv = np.linspace(0,2100,2101) # volume values
|
|
self.vv[0]=1e-5 #little trick to avoid divide by 0
|
|
self.abopArray = np.linspace(0,200,201) # potential values for abop
|
|
self.abosArray = np.linspace(0,200,201) # potential values for abos
|
|
self.recettesArray = np.linspace (50000,100000,51) # potential values for recettes
|
|
|
|
#loading the inhabitant data
|
|
book = xlrd.open_workbook('Eau2018.xls')
|
|
sheet = book.sheet_by_name('CALCULS')
|
|
self.nbHab=363
|
|
self.volume = np.array([sheet.cell_value(r,4) for r in range(1,self.nbHab+1)])#get the volume used in 2018 in m3
|
|
self.vTot=np.sum(self.volume)
|
|
self.volume[self.volume==0]=1e-5 #little trick to avoid divide by 0
|
|
self.status = np.array([sheet.cell_value(r,3) for r in range(1,self.nbHab+1)])#get the RS, RP, PRO status
|
|
self.prix2018 = np.array([sheet.cell_value(r,33) for r in range(1,self.nbHab+1)])#get the price paid in 2018 in EUR
|
|
self.prix2018_m3 = self.prix2018 / self.volume
|
|
|
|
|
|
def mainFunction(self):
|
|
'''
|
|
main function, the one called in the notebook, and that update computations depending on the slider values.
|
|
'''
|
|
self.fig = plt.figure(figsize=(8,5))
|
|
self.ax1 = plt.subplot(121)
|
|
self.ax2 = plt.subplot(122,sharex=self.ax1)
|
|
self.ax1.set_xlabel(r'volume ($m^3$) ')
|
|
self.ax2.set_xlabel(r'volume ($m^3$) ')
|
|
self.ax1.set_ylabel('Facture totale (€)')
|
|
self.ax2.set_ylabel('Prix au m3 (€)')
|
|
|
|
# initial values
|
|
self.abop= self.abopArray[100]
|
|
self.abos= self.abosArray[100]
|
|
self.recettes= self.recettesArray[25]
|
|
|
|
# initial plot
|
|
self.updateComputation()
|
|
self.updatePlot()
|
|
|
|
# SLIDERS
|
|
axcolor = 'lightgoldenrodyellow'
|
|
abopAxis = plt.axes([0.2, 0.15, 0.65, 0.05], facecolor=axcolor)
|
|
self.abopSlider = Slider(abopAxis, r'Abo. P/PRO (EUR)', np.min(self.abopArray), np.max(self.abopArray), self.abopArray[100])
|
|
self.abopSlider.on_changed(self.updateParam)
|
|
abosAxis = plt.axes([0.2, 0.1, 0.65, 0.05], facecolor=axcolor)
|
|
self.abosSlider = Slider(abosAxis, r'Abo. S (EUR)', np.min(self.abosArray), np.max(self.abosArray), self.abosArray[100])
|
|
self.abosSlider.on_changed(self.updateParam)
|
|
recettesAxis = plt.axes([0.2, 0.05, 0.65, 0.05], facecolor=axcolor)
|
|
self.recettesSlider = Slider(recettesAxis, r'Recettes (EUR)', np.min(self.recettesArray), np.max(self.recettesArray), self.recettesArray[25])
|
|
self.recettesSlider.on_changed(self.updateParam)
|
|
|
|
def updateComputation(self): # The computation, parameter dependant
|
|
#first deal with abonnements
|
|
mask= self.status=='RS'
|
|
self.abo= self.abop*np.ones((self.nbHab,))
|
|
self.abo[self.status=='RS']=self.abos
|
|
|
|
#second deal with consumption
|
|
if np.sum(self.abo)>=self.recettes:
|
|
self.p0=0
|
|
else:
|
|
self.p0= (self.recettes-np.sum(self.abo))/self.vTot
|
|
|
|
#now deal with prices
|
|
self.prixp = self.abop+self.p0*self.vv
|
|
self.prixs = self.abos+self.p0*self.vv
|
|
self.prixp_m3 = self.abop/self.vv+self.p0
|
|
self.prixs_m3 = self.abos/self.vv+self.p0
|
|
|
|
def updatePlot(self): # The plot update
|
|
self.ax1.plot(self.vv,self.prixp,label='RP/PRO')
|
|
self.ax1.plot(self.vv,self.prixs,label='RS')
|
|
self.ax1.scatter(self.volume,self.prix2018,s=3,color='g',label='2018')
|
|
|
|
self.ax2.plot(self.vv,self.prixp_m3,label='RP/PRO')
|
|
self.ax2.plot(self.vv,self.prixs_m3,label='RS')
|
|
self.ax2.scatter(self.volume,self.prix2018_m3,s=3,color='g',label='2018')
|
|
self.ax2.set_ylim(0,20)
|
|
self.ax2.text(0.9, 0.2, '$p_0=${:.2f} €'.format(self.p0),ha='right', va='bottom', transform=self.ax2.transAxes)
|
|
self.ax2.legend()
|
|
|
|
self.ax1.set_xlabel(r'volume ($m^3$) ')
|
|
self.ax2.set_xlabel(r'volume ($m^3$) ')
|
|
self.ax1.set_ylabel('Facture totale (€)')
|
|
self.ax2.set_ylabel('Prix au m3 (€)')
|
|
plt.tight_layout()
|
|
plt.subplots_adjust(top = 0.9,bottom = 0.35)
|
|
|
|
def updateParam(self,val): # Update the parameter using the slider values
|
|
self.ax1.clear()
|
|
self.ax2.clear()
|
|
self.abop= self.abopSlider.val
|
|
self.abos= self.abosSlider.val
|
|
self.recettes= self.recettesSlider.val
|
|
self.updateComputation()
|
|
self.updatePlot() #
|
|
|