#!/usr/bin/env python3 """ PrestaShop orphan product images audit/backup/delete helper. Workflow: 1) backup - detect orphan image IDs, create remote tar.gz with full folder structure, download locally 2) dry-run - show what would be removed (without deleting anything) 3) delete - delete only after explicit --confirm-delete YES Requirements: - Python 3.9+ - paramiko (`pip install paramiko`) - SSH credentials in .env (SSH_HOST, SSH_PORT, SSH_LOGIN, SSH_PASSWORD) """ from __future__ import annotations import argparse import datetime as dt import shlex import sys from pathlib import Path from typing import Dict, Tuple import paramiko REMOTE_PROJECT_PATH = "/home/{ssh_login}/ftp/migracja/public_html/wyczarujprezent.pl" REMOTE_WORKDIR = "/home/{ssh_login}/ftp/migracja/.orphan_cleanup" REMOTE_MANIFEST = "orphan_paths.txt" REMOTE_TAR_PREFIX = "orphan_images_backup" def load_env(path: Path) -> Dict[str, str]: if not path.exists(): raise FileNotFoundError(f"Brak pliku .env: {path}") env: Dict[str, str] = {} for raw in path.read_text(encoding="utf-8").splitlines(): line = raw.strip() if not line or line.startswith("#"): continue if "=" not in line: continue key, value = line.split("=", 1) key = key.strip() value = value.strip().strip('"').strip("'") env[key] = value return env def ssh_connect(env: Dict[str, str]) -> paramiko.SSHClient: host = env.get("SSH_HOST") port = int(env.get("SSH_PORT", "22")) login = env.get("SSH_LOGIN") password = env.get("SSH_PASSWORD") if not (host and login and password): raise ValueError("Brakuje SSH_HOST / SSH_LOGIN / SSH_PASSWORD w .env") client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(hostname=host, port=port, username=login, password=password, timeout=30) return client def run_ssh(client: paramiko.SSHClient, command: str, timeout: int = 3600) -> Tuple[str, str, int]: _, stdout, stderr = client.exec_command(command, timeout=timeout) out = stdout.read().decode("utf-8", errors="replace") err = stderr.read().decode("utf-8", errors="replace") code = stdout.channel.recv_exit_status() return out, err, code def remote_scan_command(project_path: str, workdir: str) -> str: # PHP scans img/p, compares with ps_image table and writes: # - orphan_ids.txt (image IDs existing on disk but missing in DB) # - orphan_paths.txt (all files for orphan IDs: original + derivatives) php_script = r""" $root = '%PROJECT_PATH%'; $work = '%WORKDIR%'; @mkdir($work, 0775, true); $params = include $root . '/app/config/parameters.php'; $p = $params['parameters']; $prefix = $p['database_prefix']; $dsn = sprintf('mysql:host=%s;port=%s;dbname=%s;charset=utf8mb4', $p['database_host'], $p['database_port'], $p['database_name']); $pdo = new PDO($dsn, $p['database_user'], $p['database_password'], [PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION]); $dbIds = []; $stmt = $pdo->query("SELECT id_image FROM {$prefix}image"); while ($row = $stmt->fetch(PDO::FETCH_NUM)) { $dbIds[(int)$row[0]] = true; } $imgRoot = $root . '/img/p'; $diskOriginalIds = []; $it = new RecursiveIteratorIterator(new RecursiveDirectoryIterator($imgRoot, FilesystemIterator::SKIP_DOTS)); foreach ($it as $f) { if (!$f->isFile()) continue; $name = $f->getFilename(); if (preg_match('/^(\d+)\.(jpe?g|png|webp|avif)$/i', $name, $m)) { $diskOriginalIds[(int)$m[1]] = true; } } $orphans = []; foreach ($diskOriginalIds as $id => $_) { if (!isset($dbIds[$id])) $orphans[$id] = true; } $orphanIdPath = $work . '/orphan_ids.txt'; $orphanPathsPath = $work . '/orphan_paths.txt'; $hId = fopen($orphanIdPath, 'w'); foreach (array_keys($orphans) as $id) { fwrite($hId, $id . PHP_EOL); } fclose($hId); $countFiles = 0; $hPaths = fopen($orphanPathsPath, 'w'); $it2 = new RecursiveIteratorIterator(new RecursiveDirectoryIterator($imgRoot, FilesystemIterator::SKIP_DOTS)); foreach ($it2 as $f) { if (!$f->isFile()) continue; $name = $f->getFilename(); if (!preg_match('/^(\d+)(?:-.*)?\.(jpe?g|png|webp|avif)$/i', $name, $m)) continue; $id = (int)$m[1]; if (!isset($orphans[$id])) continue; $full = $f->getPathname(); if (strpos($full, $root . '/') === 0) { $rel = substr($full, strlen($root) + 1); } else { $rel = ltrim($full, '/'); } fwrite($hPaths, $rel . PHP_EOL); $countFiles++; } fclose($hPaths); echo 'ORPHAN_IDS=' . count($orphans) . PHP_EOL; echo 'ORPHAN_FILES=' . $countFiles . PHP_EOL; echo 'MANIFEST=' . $orphanPathsPath . PHP_EOL; """ php_script = php_script.replace("%PROJECT_PATH%", project_path).replace("%WORKDIR%", workdir) escaped_php = php_script.replace("\\", "\\\\").replace("'", "'\"'\"'") return f"php -r '{escaped_php}'" def parse_key_values(text: str) -> Dict[str, str]: result: Dict[str, str] = {} for line in text.splitlines(): if "=" not in line: continue k, v = line.strip().split("=", 1) result[k] = v return result def ensure_remote_scan(client: paramiko.SSHClient, project_path: str, workdir: str) -> Dict[str, str]: cmd = remote_scan_command(project_path=project_path, workdir=workdir) out, err, code = run_ssh(client, cmd, timeout=5400) if code != 0: raise RuntimeError(f"Remote scan failed.\nSTDOUT:\n{out}\nSTDERR:\n{err}") data = parse_key_values(out) if "ORPHAN_IDS" not in data or "ORPHAN_FILES" not in data: raise RuntimeError(f"Nie udalo sie sparsowac wyniku skanu.\n{out}") return data def cmd_backup(args: argparse.Namespace) -> int: env = load_env(Path(args.env_file)) ssh_login = env["SSH_LOGIN"] project_path = REMOTE_PROJECT_PATH.format(ssh_login=ssh_login) workdir = REMOTE_WORKDIR.format(ssh_login=ssh_login) local_backup_dir = Path(args.output_dir).resolve() local_backup_dir.mkdir(parents=True, exist_ok=True) client = ssh_connect(env) try: scan = ensure_remote_scan(client, project_path, workdir) orphan_ids = int(scan["ORPHAN_IDS"]) orphan_files = int(scan["ORPHAN_FILES"]) if orphan_files == 0: print("Brak osieroconych plikow do archiwizacji.") return 0 ts = dt.datetime.now().strftime("%Y%m%d_%H%M%S") tar_name = f"{REMOTE_TAR_PREFIX}_{ts}.tar.gz" remote_tar = f"{workdir}/{tar_name}" manifest = f"{workdir}/{REMOTE_MANIFEST}" tar_cmd = ( f"set -e; " f"mkdir -p {shlex.quote(workdir)}; " f"cd {shlex.quote(project_path)}; " f"test -s {shlex.quote(manifest)}; " f"tar -czf {shlex.quote(remote_tar)} -T {shlex.quote(manifest)}" ) out, err, code = run_ssh(client, tar_cmd, timeout=5400) if code != 0: raise RuntimeError(f"Nie udalo sie utworzyc archiwum.\nSTDOUT:\n{out}\nSTDERR:\n{err}") local_tar = local_backup_dir / tar_name with client.open_sftp() as sftp: sftp.get(remote_tar, str(local_tar)) print(f"ORPHAN_IDS={orphan_ids}") print(f"ORPHAN_FILES={orphan_files}") print(f"REMOTE_TAR={remote_tar}") print(f"LOCAL_TAR={local_tar}") print("Status: backup utworzony i pobrany.") return 0 finally: client.close() def cmd_dry_run(args: argparse.Namespace) -> int: env = load_env(Path(args.env_file)) ssh_login = env["SSH_LOGIN"] project_path = REMOTE_PROJECT_PATH.format(ssh_login=ssh_login) workdir = REMOTE_WORKDIR.format(ssh_login=ssh_login) client = ssh_connect(env) try: scan = ensure_remote_scan(client, project_path, workdir) manifest = f"{workdir}/{REMOTE_MANIFEST}" show_cmd = ( f"set -e; " f"echo '--- PODSUMOWANIE ---'; " f"echo ORPHAN_IDS={scan['ORPHAN_IDS']}; " f"echo ORPHAN_FILES={scan['ORPHAN_FILES']}; " f"echo MANIFEST={manifest}; " f"echo '--- PRZYKLADOWE PLIKI (max {int(args.sample)}) ---'; " f"head -n {int(args.sample)} {shlex.quote(manifest)} || true" ) out, err, code = run_ssh(client, show_cmd, timeout=120) if code != 0: raise RuntimeError(f"Dry-run failed.\nSTDOUT:\n{out}\nSTDERR:\n{err}") print(out.rstrip()) if err.strip(): print("\nSTDERR:") print(err.rstrip()) print("\nStatus: nic nie zostalo usuniete.") return 0 finally: client.close() def cmd_delete(args: argparse.Namespace) -> int: if args.confirm_delete != "YES": print("Przerwano: aby usunac pliki, podaj --confirm-delete YES") return 2 env = load_env(Path(args.env_file)) ssh_login = env["SSH_LOGIN"] project_path = REMOTE_PROJECT_PATH.format(ssh_login=ssh_login) workdir = REMOTE_WORKDIR.format(ssh_login=ssh_login) manifest = f"{workdir}/{REMOTE_MANIFEST}" client = ssh_connect(env) try: scan = ensure_remote_scan(client, project_path, workdir) expected = int(scan["ORPHAN_FILES"]) if expected == 0: print("Brak osieroconych plikow do usuniecia.") return 0 # Safety checks: # - manifest must exist and be non-empty # - delete only paths under img/p # - count before/after to report real deletion number delete_cmd = ( "set -eu; " f"cd {shlex.quote(project_path)}; " f"test -s {shlex.quote(manifest)}; " "if grep -Ev '^img/p/[0-9/]+/[^/]+$' " f"{shlex.quote(manifest)} >/dev/null; then " "echo 'SafetyError: wykryto sciezki poza img/p lub w nieoczekiwanym formacie'; exit 9; fi; " f"before=$(wc -l < {shlex.quote(manifest)}); " "deleted=0; " "while IFS= read -r rel; do " "if [ -f \"$rel\" ]; then rm -f -- \"$rel\" && deleted=$((deleted+1)); fi; " f"done < {shlex.quote(manifest)}; " "after_existing=0; " f"while IFS= read -r rel; do if [ -f \"$rel\" ]; then after_existing=$((after_existing+1)); fi; done < {shlex.quote(manifest)}; " "echo BEFORE_LISTED=$before; " "echo DELETED=$deleted; " "echo STILL_EXISTS_FROM_LIST=$after_existing" ) out, err, code = run_ssh(client, delete_cmd, timeout=7200) if code != 0: raise RuntimeError(f"Delete failed.\nSTDOUT:\n{out}\nSTDERR:\n{err}") print(out.rstrip()) if err.strip(): print("\nSTDERR:") print(err.rstrip()) print("\nStatus: usuwanie zakonczone.") return 0 finally: client.close() def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Backup i czyszczenie osieroconych zdjec produktow PrestaShop (img/p)." ) parser.add_argument( "--env-file", default=".env", help="Sciezka do pliku .env z danymi SSH (domyslnie: .env)", ) sub = parser.add_subparsers(dest="command", required=True) p_backup = sub.add_parser("backup", help="Spakuj osierocone pliki i pobierz archiwum lokalnie") p_backup.add_argument( "--output-dir", default="./backups", help="Katalog lokalny na pobrane archiwum (domyslnie: ./backups)", ) p_backup.set_defaults(func=cmd_backup) p_dry = sub.add_parser("dry-run", help="Pokaz co byloby usuniete, bez kasowania") p_dry.add_argument( "--sample", default=30, type=int, help="Ile przykladowych sciezek wyswietlic (domyslnie: 30)", ) p_dry.set_defaults(func=cmd_dry_run) p_del = sub.add_parser("delete", help="Usun osierocone pliki z manifestu (wymaga potwierdzenia)") p_del.add_argument( "--confirm-delete", default="NO", help="Aby wykonac usuwanie podaj dokladnie: YES", ) p_del.set_defaults(func=cmd_delete) return parser def main() -> int: parser = build_parser() args = parser.parse_args() try: return int(args.func(args)) except Exception as exc: print(f"ERROR: {exc}", file=sys.stderr) return 1 if __name__ == "__main__": raise SystemExit(main())