170 lines
6.7 KiB
Python
170 lines
6.7 KiB
Python
#!/usr/bin/env python3
|
|
import os
|
|
import csv
|
|
import pandas as pd
|
|
import tabula
|
|
|
|
# Mots-clés utilisés pour repérer les zones à supprimer
|
|
MOT_DEBUT = "SOLDE" # Supprimer jusqu'à et y compris cette ligne
|
|
MOT_FIN = "Total des mouvements" # Supprimer cette ligne et toutes les suivantes
|
|
MOT_DATE = "date" # Lignes à ignorer si 1er champ contient ce mot
|
|
|
|
# -------------------------------------------------------------------
|
|
# Détecte automatiquement le séparateur d'un CSV à partir d'un échantillon
|
|
# -------------------------------------------------------------------
|
|
def detect_delimiter(sample_text):
|
|
try:
|
|
dialect = csv.Sniffer().sniff(sample_text, delimiters=",;|\t")
|
|
return dialect.delimiter
|
|
except Exception:
|
|
# Fallback : choix heuristique FR (beaucoup de CSV utilisent ;)
|
|
return ";" if sample_text.count(";") >= sample_text.count(",") else ","
|
|
|
|
# -------------------------------------------------------------------
|
|
# Nettoie un CSV brut issu de Tabula selon les règles demandées
|
|
# -------------------------------------------------------------------
|
|
def nettoyer_csv_texte(csv_in_path, csv_out_path):
|
|
# Lecture brute du fichier texte (pas encore DataFrame)
|
|
with open(csv_in_path, "r", encoding="utf-8", errors="replace") as f:
|
|
lines = f.readlines()
|
|
|
|
# 1 Supprimer les 3 premières lignes quoi qu'il arrive
|
|
lines = lines[3:]
|
|
|
|
# 2 Chercher la première ligne contenant MOT_DEBUT (SOLDE)
|
|
idx_debut = None
|
|
for i, line in enumerate(lines):
|
|
if MOT_DEBUT.lower() in line.lower():
|
|
idx_debut = i
|
|
break
|
|
if idx_debut is not None:
|
|
# Supprimer tout jusqu'à et y compris cette ligne
|
|
lines = lines[idx_debut + 1:]
|
|
|
|
# 3 Supprimer à partir de la ligne contenant MOT_FIN (Total des mouvements)
|
|
idx_fin = None
|
|
for i, line in enumerate(lines):
|
|
if MOT_FIN.lower() in line.lower():
|
|
idx_fin = i
|
|
break
|
|
if idx_fin is not None:
|
|
lines = lines[:idx_fin]
|
|
|
|
# 4 Détection du séparateur sur un échantillon
|
|
sample = "".join(lines[:20])
|
|
delim = detect_delimiter(sample)
|
|
|
|
# 5 Lecture en mode tolérant avec csv.reader
|
|
reader = csv.reader(lines, delimiter=delim)
|
|
rows = [row for row in reader]
|
|
|
|
# 6 Normaliser le nombre de colonnes (éviter erreurs si certaines lignes sont plus courtes)
|
|
max_cols = max(len(r) for r in rows) if rows else 0
|
|
rows = [r + [""] * (max_cols - len(r)) for r in rows]
|
|
|
|
# 7 Supprimer les lignes dont le premier champ contient "date" (sauf on garde une copie pour l'entête globale)
|
|
header_line = None
|
|
filtered_rows = []
|
|
for r in rows:
|
|
first_col = (r[0] or "").strip().lower()
|
|
if MOT_DATE in first_col:
|
|
if header_line is None:
|
|
header_line = r[:] # garder pour l'entête globale
|
|
continue # ne pas inclure cette ligne dans ce fichier final
|
|
filtered_rows.append(r)
|
|
|
|
# 8 Fusionner les lignes dont la première colonne est vide avec la précédente
|
|
merged = []
|
|
for r in filtered_rows:
|
|
if (r[0] or "").strip() == "" and merged:
|
|
prev = merged[-1]
|
|
if len(prev) < len(r):
|
|
prev += [""] * (len(r) - len(prev))
|
|
for i in range(len(r)):
|
|
if r[i].strip():
|
|
prev[i] = (prev[i] + " " + r[i]).strip() if prev[i] else r[i].strip()
|
|
else:
|
|
merged.append(r)
|
|
|
|
# 9 Nettoyer les deux dernières colonnes : supprimer les points dans les nombres
|
|
if merged:
|
|
for r in merged:
|
|
if len(r) >= 2:
|
|
# Dernière colonne
|
|
r[-1] = r[-1].replace(".", "")
|
|
if len(r) >= 3:
|
|
# Avant-dernière colonne
|
|
r[-2] = r[-2].replace(".", "")
|
|
|
|
# 10 Sauvegarde du fichier nettoyé
|
|
with open(csv_out_path, "w", encoding="utf-8", newline="") as f:
|
|
writer = csv.writer(f, delimiter=delim)
|
|
writer.writerows(merged)
|
|
|
|
return header_line # Retourne l'entête détectée pour usage ultérieur
|
|
|
|
# -------------------------------------------------------------------
|
|
# Convertit un PDF avec Tabula puis nettoie le CSV
|
|
# -------------------------------------------------------------------
|
|
def convertir_et_nettoyer(pdf_path, out_dir="/data"):
|
|
base = os.path.splitext(os.path.basename(pdf_path))[0]
|
|
csv_brut = os.path.join(out_dir, f"{base}_brut.csv")
|
|
csv_final = os.path.join(out_dir, f"{base}_final.csv")
|
|
|
|
# Conversion PDF -> CSV brut
|
|
tabula.convert_into(pdf_path, csv_brut, output_format="csv", pages="all", lattice=True)
|
|
print(f"[✓] Converti : {pdf_path}")
|
|
|
|
# Nettoyage du CSV et récupération de l'entête éventuelle
|
|
header_line = nettoyer_csv_texte(csv_brut, csv_final)
|
|
print(f"[✓] Nettoyé : {csv_final}")
|
|
return csv_final, header_line
|
|
|
|
# -------------------------------------------------------------------
|
|
# Fusionne plusieurs CSV en un seul avec ajout de l'entête globale
|
|
# -------------------------------------------------------------------
|
|
def fusionner_csv(liste_csv, fichier_sortie, header_line):
|
|
if not liste_csv:
|
|
print("Aucun CSV à fusionner.")
|
|
return
|
|
dfs = []
|
|
for csv_file in liste_csv:
|
|
try:
|
|
df = pd.read_csv(csv_file, header=None)
|
|
dfs.append(df)
|
|
except Exception as e:
|
|
print(f"[!] Erreur lecture {csv_file} : {e}")
|
|
|
|
if dfs:
|
|
final_df = pd.concat(dfs, ignore_index=True)
|
|
# Ajout de l'entête en première ligne
|
|
if header_line:
|
|
header_df = pd.DataFrame([header_line])
|
|
final_df = pd.concat([header_df, final_df], ignore_index=True)
|
|
final_df.to_csv(fichier_sortie, index=False, header=False)
|
|
print(f"[✓] Fichier fusionné : {fichier_sortie}")
|
|
|
|
# -------------------------------------------------------------------
|
|
# Traitement complet : conversion, nettoyage, fusion
|
|
# -------------------------------------------------------------------
|
|
def traitement_batch(workdir="/data"):
|
|
pdfs = sorted([f for f in os.listdir(workdir) if f.lower().endswith(".pdf")])
|
|
if not pdfs:
|
|
print("Aucun PDF trouvé.")
|
|
return
|
|
|
|
fichiers_finaux = []
|
|
header_global = None
|
|
for pdf in pdfs:
|
|
final_csv, header_line = convertir_et_nettoyer(os.path.join(workdir, pdf), workdir)
|
|
fichiers_finaux.append(final_csv)
|
|
if header_line and header_global is None:
|
|
header_global = header_line # on garde le premier trouvé
|
|
|
|
# Fusion de tous les fichiers nettoyés en un seul
|
|
fusionner_csv(fichiers_finaux, os.path.join(workdir, "fusion_total.csv"), header_global)
|
|
|
|
# -------------------------------------------------------------------
|
|
if __name__ == "__main__":
|
|
traitement_batch("/data")
|