Files
pdf2csv/convert.py
2025-08-08 20:53:04 +02:00

170 lines
6.8 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import os
import csv
import pandas as pd
import tabula
# Mots-clés utilisés pour repérer les zones à supprimer
MOT_DEBUT = "SOLDE" # Supprimer jusqu'à et y compris cette ligne
MOT_FIN = "Total des mouvements" # Supprimer cette ligne et toutes les suivantes
MOT_DATE = "date" # Lignes à ignorer si 1er champ contient ce mot
# -------------------------------------------------------------------
# Détecte automatiquement le séparateur d'un CSV à partir d'un échantillon
# -------------------------------------------------------------------
def detect_delimiter(sample_text):
try:
dialect = csv.Sniffer().sniff(sample_text, delimiters=",;|\t")
return dialect.delimiter
except Exception:
# Fallback : choix heuristique FR (beaucoup de CSV utilisent ;)
return ";" if sample_text.count(";") >= sample_text.count(",") else ","
# -------------------------------------------------------------------
# Nettoie un CSV brut issu de Tabula selon les règles demandées
# -------------------------------------------------------------------
def nettoyer_csv_texte(csv_in_path, csv_out_path):
# Lecture brute du fichier texte (pas encore DataFrame)
with open(csv_in_path, "r", encoding="utf-8", errors="replace") as f:
lines = f.readlines()
# 1⃣ Supprimer les 3 premières lignes quoi qu'il arrive
lines = lines[3:]
# 2⃣ Chercher la première ligne contenant MOT_DEBUT (SOLDE)
idx_debut = None
for i, line in enumerate(lines):
if MOT_DEBUT.lower() in line.lower():
idx_debut = i
break
if idx_debut is not None:
# Supprimer tout jusqu'à et y compris cette ligne
lines = lines[idx_debut + 1:]
# 3⃣ Supprimer à partir de la ligne contenant MOT_FIN (Total des mouvements)
idx_fin = None
for i, line in enumerate(lines):
if MOT_FIN.lower() in line.lower():
idx_fin = i
break
if idx_fin is not None:
lines = lines[:idx_fin]
# 4⃣ Détection du séparateur sur un échantillon
sample = "".join(lines[:20])
delim = detect_delimiter(sample)
# 5⃣ Lecture en mode tolérant avec csv.reader
reader = csv.reader(lines, delimiter=delim)
rows = [row for row in reader]
# 6⃣ Normaliser le nombre de colonnes (éviter erreurs si certaines lignes sont plus courtes)
max_cols = max(len(r) for r in rows) if rows else 0
rows = [r + [""] * (max_cols - len(r)) for r in rows]
# 7⃣ Supprimer les lignes dont le premier champ contient "date" (sauf on garde une copie pour l'entête globale)
header_line = None
filtered_rows = []
for r in rows:
first_col = (r[0] or "").strip().lower()
if MOT_DATE in first_col:
if header_line is None:
header_line = r[:] # garder pour l'entête globale
continue # ne pas inclure cette ligne dans ce fichier final
filtered_rows.append(r)
# 8⃣ Fusionner les lignes dont la première colonne est vide avec la précédente
merged = []
for r in filtered_rows:
if (r[0] or "").strip() == "" and merged:
prev = merged[-1]
if len(prev) < len(r):
prev += [""] * (len(r) - len(prev))
for i in range(len(r)):
if r[i].strip():
prev[i] = (prev[i] + " " + r[i]).strip() if prev[i] else r[i].strip()
else:
merged.append(r)
# 9⃣ Nettoyer les deux dernières colonnes : supprimer les points dans les nombres
if merged:
for r in merged:
if len(r) >= 2:
# Dernière colonne
r[-1] = r[-1].replace(".", "")
if len(r) >= 3:
# Avant-dernière colonne
r[-2] = r[-2].replace(".", "")
# 🔟 Sauvegarde du fichier nettoyé
with open(csv_out_path, "w", encoding="utf-8", newline="") as f:
writer = csv.writer(f, delimiter=delim)
writer.writerows(merged)
return header_line # Retourne l'entête détectée pour usage ultérieur
# -------------------------------------------------------------------
# Convertit un PDF avec Tabula puis nettoie le CSV
# -------------------------------------------------------------------
def convertir_et_nettoyer(pdf_path, out_dir="/data"):
base = os.path.splitext(os.path.basename(pdf_path))[0]
csv_brut = os.path.join(out_dir, f"{base}_brut.csv")
csv_final = os.path.join(out_dir, f"{base}_final.csv")
# Conversion PDF -> CSV brut
tabula.convert_into(pdf_path, csv_brut, output_format="csv", pages="all", lattice=True)
print(f"[✓] Converti : {pdf_path}")
# Nettoyage du CSV et récupération de l'entête éventuelle
header_line = nettoyer_csv_texte(csv_brut, csv_final)
print(f"[✓] Nettoyé : {csv_final}")
return csv_final, header_line
# -------------------------------------------------------------------
# Fusionne plusieurs CSV en un seul avec ajout de l'entête globale
# -------------------------------------------------------------------
def fusionner_csv(liste_csv, fichier_sortie, header_line):
if not liste_csv:
print("Aucun CSV à fusionner.")
return
dfs = []
for csv_file in liste_csv:
try:
df = pd.read_csv(csv_file, header=None)
dfs.append(df)
except Exception as e:
print(f"[!] Erreur lecture {csv_file} : {e}")
if dfs:
final_df = pd.concat(dfs, ignore_index=True)
# Ajout de l'entête en première ligne
if header_line:
header_df = pd.DataFrame([header_line])
final_df = pd.concat([header_df, final_df], ignore_index=True)
final_df.to_csv(fichier_sortie, index=False, header=False)
print(f"[✓] Fichier fusionné : {fichier_sortie}")
# -------------------------------------------------------------------
# Traitement complet : conversion, nettoyage, fusion
# -------------------------------------------------------------------
def traitement_batch(workdir="/data"):
pdfs = sorted([f for f in os.listdir(workdir) if f.lower().endswith(".pdf")])
if not pdfs:
print("Aucun PDF trouvé.")
return
fichiers_finaux = []
header_global = None
for pdf in pdfs:
final_csv, header_line = convertir_et_nettoyer(os.path.join(workdir, pdf), workdir)
fichiers_finaux.append(final_csv)
if header_line and header_global is None:
header_global = header_line # on garde le premier trouvé
# Fusion de tous les fichiers nettoyés en un seul
fusionner_csv(fichiers_finaux, os.path.join(workdir, "fusion_total.csv"), header_global)
# -------------------------------------------------------------------
if __name__ == "__main__":
traitement_batch("/data")