This commit is contained in:
2026-04-08 20:27:50 +02:00
parent 371cd17bbf
commit 3230af667b
10 changed files with 45061 additions and 6 deletions

View File

@@ -0,0 +1,356 @@
#!/usr/bin/env python3
"""
PrestaShop orphan product images audit/backup/delete helper.
Workflow:
1) backup - detect orphan image IDs, create remote tar.gz with full folder structure, download locally
2) dry-run - show what would be removed (without deleting anything)
3) delete - delete only after explicit --confirm-delete YES
Requirements:
- Python 3.9+
- paramiko (`pip install paramiko`)
- SSH credentials in .env (SSH_HOST, SSH_PORT, SSH_LOGIN, SSH_PASSWORD)
"""
from __future__ import annotations
import argparse
import datetime as dt
import shlex
import sys
from pathlib import Path
from typing import Dict, Tuple
import paramiko
REMOTE_PROJECT_PATH = "/home/{ssh_login}/ftp/migracja/public_html/wyczarujprezent.pl"
REMOTE_WORKDIR = "/home/{ssh_login}/ftp/migracja/.orphan_cleanup"
REMOTE_MANIFEST = "orphan_paths.txt"
REMOTE_TAR_PREFIX = "orphan_images_backup"
def load_env(path: Path) -> Dict[str, str]:
if not path.exists():
raise FileNotFoundError(f"Brak pliku .env: {path}")
env: Dict[str, str] = {}
for raw in path.read_text(encoding="utf-8").splitlines():
line = raw.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
env[key] = value
return env
def ssh_connect(env: Dict[str, str]) -> paramiko.SSHClient:
host = env.get("SSH_HOST")
port = int(env.get("SSH_PORT", "22"))
login = env.get("SSH_LOGIN")
password = env.get("SSH_PASSWORD")
if not (host and login and password):
raise ValueError("Brakuje SSH_HOST / SSH_LOGIN / SSH_PASSWORD w .env")
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname=host, port=port, username=login, password=password, timeout=30)
return client
def run_ssh(client: paramiko.SSHClient, command: str, timeout: int = 3600) -> Tuple[str, str, int]:
_, stdout, stderr = client.exec_command(command, timeout=timeout)
out = stdout.read().decode("utf-8", errors="replace")
err = stderr.read().decode("utf-8", errors="replace")
code = stdout.channel.recv_exit_status()
return out, err, code
def remote_scan_command(project_path: str, workdir: str) -> str:
# PHP scans img/p, compares with ps_image table and writes:
# - orphan_ids.txt (image IDs existing on disk but missing in DB)
# - orphan_paths.txt (all files for orphan IDs: original + derivatives)
php_script = r"""
$root = '%PROJECT_PATH%';
$work = '%WORKDIR%';
@mkdir($work, 0775, true);
$params = include $root . '/app/config/parameters.php';
$p = $params['parameters'];
$prefix = $p['database_prefix'];
$dsn = sprintf('mysql:host=%s;port=%s;dbname=%s;charset=utf8mb4', $p['database_host'], $p['database_port'], $p['database_name']);
$pdo = new PDO($dsn, $p['database_user'], $p['database_password'], [PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION]);
$dbIds = [];
$stmt = $pdo->query("SELECT id_image FROM {$prefix}image");
while ($row = $stmt->fetch(PDO::FETCH_NUM)) {
$dbIds[(int)$row[0]] = true;
}
$imgRoot = $root . '/img/p';
$diskOriginalIds = [];
$it = new RecursiveIteratorIterator(new RecursiveDirectoryIterator($imgRoot, FilesystemIterator::SKIP_DOTS));
foreach ($it as $f) {
if (!$f->isFile()) continue;
$name = $f->getFilename();
if (preg_match('/^(\d+)\.(jpe?g|png|webp|avif)$/i', $name, $m)) {
$diskOriginalIds[(int)$m[1]] = true;
}
}
$orphans = [];
foreach ($diskOriginalIds as $id => $_) {
if (!isset($dbIds[$id])) $orphans[$id] = true;
}
$orphanIdPath = $work . '/orphan_ids.txt';
$orphanPathsPath = $work . '/orphan_paths.txt';
$hId = fopen($orphanIdPath, 'w');
foreach (array_keys($orphans) as $id) {
fwrite($hId, $id . PHP_EOL);
}
fclose($hId);
$countFiles = 0;
$hPaths = fopen($orphanPathsPath, 'w');
$it2 = new RecursiveIteratorIterator(new RecursiveDirectoryIterator($imgRoot, FilesystemIterator::SKIP_DOTS));
foreach ($it2 as $f) {
if (!$f->isFile()) continue;
$name = $f->getFilename();
if (!preg_match('/^(\d+)(?:-.*)?\.(jpe?g|png|webp|avif)$/i', $name, $m)) continue;
$id = (int)$m[1];
if (!isset($orphans[$id])) continue;
$full = $f->getPathname();
if (strpos($full, $root . '/') === 0) {
$rel = substr($full, strlen($root) + 1);
} else {
$rel = ltrim($full, '/');
}
fwrite($hPaths, $rel . PHP_EOL);
$countFiles++;
}
fclose($hPaths);
echo 'ORPHAN_IDS=' . count($orphans) . PHP_EOL;
echo 'ORPHAN_FILES=' . $countFiles . PHP_EOL;
echo 'MANIFEST=' . $orphanPathsPath . PHP_EOL;
"""
php_script = php_script.replace("%PROJECT_PATH%", project_path).replace("%WORKDIR%", workdir)
escaped_php = php_script.replace("\\", "\\\\").replace("'", "'\"'\"'")
return f"php -r '{escaped_php}'"
def parse_key_values(text: str) -> Dict[str, str]:
result: Dict[str, str] = {}
for line in text.splitlines():
if "=" not in line:
continue
k, v = line.strip().split("=", 1)
result[k] = v
return result
def ensure_remote_scan(client: paramiko.SSHClient, project_path: str, workdir: str) -> Dict[str, str]:
cmd = remote_scan_command(project_path=project_path, workdir=workdir)
out, err, code = run_ssh(client, cmd, timeout=5400)
if code != 0:
raise RuntimeError(f"Remote scan failed.\nSTDOUT:\n{out}\nSTDERR:\n{err}")
data = parse_key_values(out)
if "ORPHAN_IDS" not in data or "ORPHAN_FILES" not in data:
raise RuntimeError(f"Nie udalo sie sparsowac wyniku skanu.\n{out}")
return data
def cmd_backup(args: argparse.Namespace) -> int:
env = load_env(Path(args.env_file))
ssh_login = env["SSH_LOGIN"]
project_path = REMOTE_PROJECT_PATH.format(ssh_login=ssh_login)
workdir = REMOTE_WORKDIR.format(ssh_login=ssh_login)
local_backup_dir = Path(args.output_dir).resolve()
local_backup_dir.mkdir(parents=True, exist_ok=True)
client = ssh_connect(env)
try:
scan = ensure_remote_scan(client, project_path, workdir)
orphan_ids = int(scan["ORPHAN_IDS"])
orphan_files = int(scan["ORPHAN_FILES"])
if orphan_files == 0:
print("Brak osieroconych plikow do archiwizacji.")
return 0
ts = dt.datetime.now().strftime("%Y%m%d_%H%M%S")
tar_name = f"{REMOTE_TAR_PREFIX}_{ts}.tar.gz"
remote_tar = f"{workdir}/{tar_name}"
manifest = f"{workdir}/{REMOTE_MANIFEST}"
tar_cmd = (
f"set -e; "
f"mkdir -p {shlex.quote(workdir)}; "
f"cd {shlex.quote(project_path)}; "
f"test -s {shlex.quote(manifest)}; "
f"tar -czf {shlex.quote(remote_tar)} -T {shlex.quote(manifest)}"
)
out, err, code = run_ssh(client, tar_cmd, timeout=5400)
if code != 0:
raise RuntimeError(f"Nie udalo sie utworzyc archiwum.\nSTDOUT:\n{out}\nSTDERR:\n{err}")
local_tar = local_backup_dir / tar_name
with client.open_sftp() as sftp:
sftp.get(remote_tar, str(local_tar))
print(f"ORPHAN_IDS={orphan_ids}")
print(f"ORPHAN_FILES={orphan_files}")
print(f"REMOTE_TAR={remote_tar}")
print(f"LOCAL_TAR={local_tar}")
print("Status: backup utworzony i pobrany.")
return 0
finally:
client.close()
def cmd_dry_run(args: argparse.Namespace) -> int:
env = load_env(Path(args.env_file))
ssh_login = env["SSH_LOGIN"]
project_path = REMOTE_PROJECT_PATH.format(ssh_login=ssh_login)
workdir = REMOTE_WORKDIR.format(ssh_login=ssh_login)
client = ssh_connect(env)
try:
scan = ensure_remote_scan(client, project_path, workdir)
manifest = f"{workdir}/{REMOTE_MANIFEST}"
show_cmd = (
f"set -e; "
f"echo '--- PODSUMOWANIE ---'; "
f"echo ORPHAN_IDS={scan['ORPHAN_IDS']}; "
f"echo ORPHAN_FILES={scan['ORPHAN_FILES']}; "
f"echo MANIFEST={manifest}; "
f"echo '--- PRZYKLADOWE PLIKI (max {int(args.sample)}) ---'; "
f"head -n {int(args.sample)} {shlex.quote(manifest)} || true"
)
out, err, code = run_ssh(client, show_cmd, timeout=120)
if code != 0:
raise RuntimeError(f"Dry-run failed.\nSTDOUT:\n{out}\nSTDERR:\n{err}")
print(out.rstrip())
if err.strip():
print("\nSTDERR:")
print(err.rstrip())
print("\nStatus: nic nie zostalo usuniete.")
return 0
finally:
client.close()
def cmd_delete(args: argparse.Namespace) -> int:
if args.confirm_delete != "YES":
print("Przerwano: aby usunac pliki, podaj --confirm-delete YES")
return 2
env = load_env(Path(args.env_file))
ssh_login = env["SSH_LOGIN"]
project_path = REMOTE_PROJECT_PATH.format(ssh_login=ssh_login)
workdir = REMOTE_WORKDIR.format(ssh_login=ssh_login)
manifest = f"{workdir}/{REMOTE_MANIFEST}"
client = ssh_connect(env)
try:
scan = ensure_remote_scan(client, project_path, workdir)
expected = int(scan["ORPHAN_FILES"])
if expected == 0:
print("Brak osieroconych plikow do usuniecia.")
return 0
# Safety checks:
# - manifest must exist and be non-empty
# - delete only paths under img/p
# - count before/after to report real deletion number
delete_cmd = (
"set -eu; "
f"cd {shlex.quote(project_path)}; "
f"test -s {shlex.quote(manifest)}; "
"if grep -Ev '^img/p/[0-9/]+/[^/]+$' "
f"{shlex.quote(manifest)} >/dev/null; then "
"echo 'SafetyError: wykryto sciezki poza img/p lub w nieoczekiwanym formacie'; exit 9; fi; "
f"before=$(wc -l < {shlex.quote(manifest)}); "
"deleted=0; "
"while IFS= read -r rel; do "
"if [ -f \"$rel\" ]; then rm -f -- \"$rel\" && deleted=$((deleted+1)); fi; "
f"done < {shlex.quote(manifest)}; "
"after_existing=0; "
f"while IFS= read -r rel; do if [ -f \"$rel\" ]; then after_existing=$((after_existing+1)); fi; done < {shlex.quote(manifest)}; "
"echo BEFORE_LISTED=$before; "
"echo DELETED=$deleted; "
"echo STILL_EXISTS_FROM_LIST=$after_existing"
)
out, err, code = run_ssh(client, delete_cmd, timeout=7200)
if code != 0:
raise RuntimeError(f"Delete failed.\nSTDOUT:\n{out}\nSTDERR:\n{err}")
print(out.rstrip())
if err.strip():
print("\nSTDERR:")
print(err.rstrip())
print("\nStatus: usuwanie zakonczone.")
return 0
finally:
client.close()
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Backup i czyszczenie osieroconych zdjec produktow PrestaShop (img/p)."
)
parser.add_argument(
"--env-file",
default=".env",
help="Sciezka do pliku .env z danymi SSH (domyslnie: .env)",
)
sub = parser.add_subparsers(dest="command", required=True)
p_backup = sub.add_parser("backup", help="Spakuj osierocone pliki i pobierz archiwum lokalnie")
p_backup.add_argument(
"--output-dir",
default="./backups",
help="Katalog lokalny na pobrane archiwum (domyslnie: ./backups)",
)
p_backup.set_defaults(func=cmd_backup)
p_dry = sub.add_parser("dry-run", help="Pokaz co byloby usuniete, bez kasowania")
p_dry.add_argument(
"--sample",
default=30,
type=int,
help="Ile przykladowych sciezek wyswietlic (domyslnie: 30)",
)
p_dry.set_defaults(func=cmd_dry_run)
p_del = sub.add_parser("delete", help="Usun osierocone pliki z manifestu (wymaga potwierdzenia)")
p_del.add_argument(
"--confirm-delete",
default="NO",
help="Aby wykonac usuwanie podaj dokladnie: YES",
)
p_del.set_defaults(func=cmd_delete)
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
try:
return int(args.func(args))
except Exception as exc:
print(f"ERROR: {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
raise SystemExit(main())