Files
pdf2csv/convert.py
2025-10-12 02:10:57 +02:00

488 lines
16 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Convertisseur de PDF en CSV avec nettoyage automatique.
Conçu pour traiter des relevés bancaires et autres tableaux PDF.
"""
import os
import csv
import argparse
import logging
import tempfile
from pathlib import Path
from typing import List, Optional, Tuple
from contextlib import contextmanager
import pandas as pd
import tabula
# Configuration du logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
class Configuration:
"""Configuration centralisée de l'application."""
def __init__(self):
# Mots-clés pour le nettoyage (configurables via variables d'environnement)
self.MOT_DEBUT = os.getenv("MOT_DEBUT", "SOLDE")
self.MOT_FIN = os.getenv("MOT_FIN", "Total des mouvements")
self.MOT_DATE = os.getenv("MOT_DATE", "date")
# Options de traitement
self.SKIP_LINES = int(os.getenv("SKIP_LINES", "3"))
self.CLEAN_TEMP_FILES = os.getenv("CLEAN_TEMP_FILES", "true").lower() == "true"
# Options Tabula
self.TABULA_LATTICE = os.getenv("TABULA_LATTICE", "true").lower() == "true"
self.TABULA_PAGES = os.getenv("TABULA_PAGES", "all")
def __repr__(self) -> str:
return (f"Configuration(MOT_DEBUT={self.MOT_DEBUT}, MOT_FIN={self.MOT_FIN}, "
f"MOT_DATE={self.MOT_DATE}, SKIP_LINES={self.SKIP_LINES})")
@contextmanager
def temporary_file_tracker():
"""Gestionnaire de contexte pour suivre et nettoyer les fichiers temporaires."""
temp_files: List[Path] = []
try:
yield temp_files
finally:
for temp_file in temp_files:
try:
if temp_file.exists():
temp_file.unlink()
logger.debug(f"Fichier temporaire supprimé : {temp_file}")
except Exception as e:
logger.warning(f"Impossible de supprimer {temp_file}: {e}")
def valider_fichier_pdf(pdf_path: Path) -> bool:
"""
Valide qu'un fichier PDF existe et est accessible.
Args:
pdf_path: Chemin vers le fichier PDF
Returns:
True si le fichier est valide, False sinon
"""
if not pdf_path.exists():
logger.error(f"Le fichier PDF n'existe pas : {pdf_path}")
return False
if not pdf_path.is_file():
logger.error(f"Le chemin n'est pas un fichier : {pdf_path}")
return False
if pdf_path.stat().st_size == 0:
logger.error(f"Le fichier PDF est vide : {pdf_path}")
return False
if not pdf_path.suffix.lower() == '.pdf':
logger.warning(f"L'extension n'est pas .pdf : {pdf_path}")
return True
def detect_delimiter(sample_text: str) -> str:
"""
Détecte automatiquement le séparateur d'un CSV à partir d'un échantillon.
Args:
sample_text: Échantillon de texte CSV
Returns:
Le délimiteur détecté
"""
try:
dialect = csv.Sniffer().sniff(sample_text, delimiters=",;|\t")
delimiter = dialect.delimiter
logger.debug(f"Délimiteur détecté : '{delimiter}'")
return delimiter
except Exception as e:
logger.debug(f"Impossible de détecter le délimiteur automatiquement : {e}")
# Fallback : choix heuristique FR (beaucoup de CSV utilisent ;)
delimiter = ";" if sample_text.count(";") >= sample_text.count(",") else ","
logger.debug(f"Délimiteur par défaut utilisé : '{delimiter}'")
return delimiter
def supprimer_lignes_par_mot_cle(lines: List[str], mot_cle: str,
mode: str = 'jusqu_a') -> List[str]:
"""
Supprime des lignes basées sur un mot-clé.
Args:
lines: Liste de lignes
mot_cle: Mot-clé à rechercher
mode: 'jusqu_a' (supprime jusqu'à la ligne incluse)
ou 'depuis' (supprime à partir de la ligne)
Returns:
Liste de lignes filtrées
"""
idx = None
for i, line in enumerate(lines):
if mot_cle.lower() in line.lower():
idx = i
break
if idx is None:
logger.debug(f"Mot-clé '{mot_cle}' non trouvé (mode: {mode})")
return lines
if mode == 'jusqu_a':
logger.debug(f"Suppression de {idx + 1} lignes jusqu'à '{mot_cle}'")
return lines[idx + 1:]
elif mode == 'depuis':
logger.debug(f"Suppression de {len(lines) - idx} lignes depuis '{mot_cle}'")
return lines[:idx]
else:
raise ValueError(f"Mode inconnu : {mode}")
def nettoyer_csv_texte(csv_in_path: Path, csv_out_path: Path,
config: Configuration) -> Optional[List[str]]:
"""
Nettoie un CSV brut issu de Tabula selon les règles configurées.
Args:
csv_in_path: Chemin du CSV d'entrée
csv_out_path: Chemin du CSV de sortie
config: Configuration de l'application
Returns:
L'en-tête détectée ou None
"""
try:
# Lecture brute du fichier texte
with open(csv_in_path, "r", encoding="utf-8", errors="strict") as f:
lines = f.readlines()
logger.debug(f"Lecture de {len(lines)} lignes depuis {csv_in_path}")
# 1. Supprimer les N premières lignes
if config.SKIP_LINES > 0:
lines = lines[config.SKIP_LINES:]
logger.debug(f"Sauté {config.SKIP_LINES} premières lignes")
# 2. Supprimer jusqu'au mot de début (inclus)
lines = supprimer_lignes_par_mot_cle(lines, config.MOT_DEBUT, mode='jusqu_a')
# 3. Supprimer à partir du mot de fin (inclus)
lines = supprimer_lignes_par_mot_cle(lines, config.MOT_FIN, mode='depuis')
if not lines:
logger.warning(f"Aucune ligne restante après nettoyage pour {csv_in_path}")
return None
# 4. Détection du séparateur
sample = "".join(lines[:min(20, len(lines))])
delim = detect_delimiter(sample)
# 5. Lecture avec csv.reader
reader = csv.reader(lines, delimiter=delim)
rows = list(reader)
# 6. Normaliser le nombre de colonnes
if rows:
max_cols = max(len(r) for r in rows)
rows = [r + [""] * (max_cols - len(r)) for r in rows]
logger.debug(f"Normalisation à {max_cols} colonnes")
# 7. Filtrer les lignes contenant le mot "date" dans la première colonne
header_line = None
filtered_rows = []
for r in rows:
first_col = (r[0] or "").strip().lower()
if config.MOT_DATE in first_col:
if header_line is None:
header_line = r[:] # Garder pour l'en-tête globale
logger.debug(f"En-tête détectée : {header_line[:3]}...")
continue
filtered_rows.append(r)
# 8. Fusionner les lignes dont la première colonne est vide
merged = []
for r in filtered_rows:
if (r[0] or "").strip() == "" and merged:
prev = merged[-1]
if len(prev) < len(r):
prev += [""] * (len(r) - len(prev))
for i in range(len(r)):
if r[i].strip():
prev[i] = (prev[i] + " " + r[i]).strip() if prev[i] else r[i].strip()
else:
merged.append(r)
logger.debug(f"Fusion résultante : {len(filtered_rows)} -> {len(merged)} lignes")
# 9. Nettoyer les deux dernières colonnes (supprimer les points)
if merged:
for r in merged:
if len(r) >= 2:
r[-1] = r[-1].replace(".", "")
if len(r) >= 3:
r[-2] = r[-2].replace(".", "")
# 10. Sauvegarde
with open(csv_out_path, "w", encoding="utf-8", newline="") as f:
writer = csv.writer(f, delimiter=delim)
writer.writerows(merged)
logger.info(f"CSV nettoyé sauvegardé : {csv_out_path} ({len(merged)} lignes)")
return header_line
except UnicodeDecodeError as e:
logger.error(f"Erreur d'encodage lors de la lecture de {csv_in_path}: {e}")
raise
except Exception as e:
logger.error(f"Erreur lors du nettoyage de {csv_in_path}: {e}")
raise
def convertir_et_nettoyer(pdf_path: Path, out_dir: Path, config: Configuration,
temp_files: Optional[List[Path]] = None) -> Tuple[Path, Optional[List[str]]]:
"""
Convertit un PDF avec Tabula puis nettoie le CSV résultant.
Args:
pdf_path: Chemin du PDF à convertir
out_dir: Répertoire de sortie
config: Configuration de l'application
temp_files: Liste pour suivre les fichiers temporaires
Returns:
Tuple (chemin du CSV final, en-tête détectée)
"""
if not valider_fichier_pdf(pdf_path):
raise ValueError(f"Fichier PDF invalide : {pdf_path}")
base = pdf_path.stem
csv_brut = out_dir / f"{base}_brut.csv"
csv_final = out_dir / f"{base}_final.csv"
if temp_files is not None and config.CLEAN_TEMP_FILES:
temp_files.append(csv_brut)
try:
# Conversion PDF -> CSV brut
logger.info(f"Conversion de {pdf_path.name}...")
tabula.convert_into(
str(pdf_path),
str(csv_brut),
output_format="csv",
pages=config.TABULA_PAGES,
lattice=config.TABULA_LATTICE
)
logger.info(f"✓ Converti : {pdf_path.name}")
# Nettoyage du CSV
header_line = nettoyer_csv_texte(csv_brut, csv_final, config)
logger.info(f"✓ Nettoyé : {csv_final.name}")
return csv_final, header_line
except Exception as e:
logger.error(f"Erreur lors de la conversion de {pdf_path.name}: {e}")
raise
def fusionner_csv(liste_csv: List[Path], fichier_sortie: Path,
header_line: Optional[List[str]]) -> None:
"""
Fusionne plusieurs CSV en un seul avec ajout de l'en-tête.
Args:
liste_csv: Liste des fichiers CSV à fusionner
fichier_sortie: Chemin du fichier de sortie
header_line: En-tête à ajouter en première ligne
"""
if not liste_csv:
logger.warning("Aucun CSV à fusionner.")
return
logger.info(f"Fusion de {len(liste_csv)} fichier(s) CSV...")
dfs = []
for csv_file in liste_csv:
try:
df = pd.read_csv(csv_file, header=None)
dfs.append(df)
logger.debug(f"Chargé : {csv_file.name} ({len(df)} lignes)")
except Exception as e:
logger.error(f"Erreur lors de la lecture de {csv_file}: {e}")
# Continue avec les autres fichiers
if not dfs:
logger.error("Aucun fichier CSV n'a pu être lu.")
return
try:
final_df = pd.concat(dfs, ignore_index=True)
# Ajout de l'en-tête en première ligne
if header_line:
header_df = pd.DataFrame([header_line])
final_df = pd.concat([header_df, final_df], ignore_index=True)
logger.debug("En-tête ajoutée au fichier fusionné")
final_df.to_csv(fichier_sortie, index=False, header=False)
logger.info(f"✓ Fichier fusionné créé : {fichier_sortie} ({len(final_df)} lignes)")
except Exception as e:
logger.error(f"Erreur lors de la fusion des CSV : {e}")
raise
def traitement_batch(workdir: Path, config: Configuration) -> None:
"""
Traitement complet : conversion, nettoyage et fusion de tous les PDFs.
Args:
workdir: Répertoire de travail contenant les PDFs
config: Configuration de l'application
"""
if not workdir.exists():
logger.error(f"Le répertoire n'existe pas : {workdir}")
raise FileNotFoundError(f"Répertoire introuvable : {workdir}")
if not workdir.is_dir():
logger.error(f"Le chemin n'est pas un répertoire : {workdir}")
raise NotADirectoryError(f"Pas un répertoire : {workdir}")
# Recherche des PDFs
pdfs = sorted(workdir.glob("*.pdf")) + sorted(workdir.glob("*.PDF"))
if not pdfs:
logger.warning(f"Aucun fichier PDF trouvé dans {workdir}")
return
logger.info(f"Trouvé {len(pdfs)} fichier(s) PDF à traiter")
logger.info(f"Configuration : {config}")
# Traitement avec gestion des fichiers temporaires
with temporary_file_tracker() as temp_files:
fichiers_finaux = []
header_global = None
erreurs = 0
for pdf in pdfs:
try:
final_csv, header_line = convertir_et_nettoyer(pdf, workdir, config, temp_files)
fichiers_finaux.append(final_csv)
if header_line and header_global is None:
header_global = header_line
except Exception as e:
logger.error(f"Échec du traitement de {pdf.name} : {e}")
erreurs += 1
# Continue avec les autres fichiers
# Fusion finale
if fichiers_finaux:
try:
fusionner_csv(fichiers_finaux, workdir / "fusion_total.csv", header_global)
except Exception as e:
logger.error(f"Échec de la fusion finale : {e}")
erreurs += 1
# Résumé
logger.info(f"\n{'='*60}")
logger.info(f"Traitement terminé :")
logger.info(f" - Fichiers traités avec succès : {len(fichiers_finaux)}/{len(pdfs)}")
logger.info(f" - Erreurs : {erreurs}")
logger.info(f"{'='*60}")
def main():
"""Point d'entrée principal avec gestion des arguments CLI."""
parser = argparse.ArgumentParser(
description="Convertit des fichiers PDF en CSV avec nettoyage automatique",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Exemples:
%(prog)s /data # Traite tous les PDFs dans /data
%(prog)s /data --verbose # Mode verbeux
%(prog)s /data --mot-debut BALANCE # Personnalise le mot de début
Variables d'environnement:
MOT_DEBUT Mot-clé de début (défaut: "SOLDE")
MOT_FIN Mot-clé de fin (défaut: "Total des mouvements")
MOT_DATE Mot-clé date (défaut: "date")
SKIP_LINES Lignes à sauter au début (défaut: 3)
CLEAN_TEMP_FILES Nettoyer les fichiers temporaires (défaut: true)
"""
)
parser.add_argument(
"workdir",
type=Path,
nargs="?",
default=Path("/data"),
help="Répertoire contenant les fichiers PDF (défaut: /data)"
)
parser.add_argument(
"-v", "--verbose",
action="store_true",
help="Active le mode verbeux (DEBUG)"
)
parser.add_argument(
"--mot-debut",
type=str,
help="Mot-clé de début (surcharge MOT_DEBUT)"
)
parser.add_argument(
"--mot-fin",
type=str,
help="Mot-clé de fin (surcharge MOT_FIN)"
)
parser.add_argument(
"--no-clean",
action="store_true",
help="Conserve les fichiers temporaires"
)
args = parser.parse_args()
# Configuration du niveau de log
if args.verbose:
logger.setLevel(logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
# Création de la configuration
config = Configuration()
# Surcharges depuis les arguments CLI
if args.mot_debut:
config.MOT_DEBUT = args.mot_debut
if args.mot_fin:
config.MOT_FIN = args.mot_fin
if args.no_clean:
config.CLEAN_TEMP_FILES = False
# Lancement du traitement
try:
traitement_batch(args.workdir, config)
except KeyboardInterrupt:
logger.warning("\nInterruption par l'utilisateur")
return 1
except Exception as e:
logger.error(f"Erreur fatale : {e}", exc_info=args.verbose)
return 1
return 0
if __name__ == "__main__":
exit(main())