180 lines
5.8 KiB
Python
180 lines
5.8 KiB
Python
"""
|
|
Pobieranie zdjec od klientow z poczty IMAP.
|
|
|
|
Wspolny modul wykorzystywany przez generatory PSD ktore potrzebuja zdjecia
|
|
od klienta (np. akrylowe podziekowania). Lokalizuje maile od podanego adresu
|
|
nadawcy w ostatnich N dniach i pobiera pierwszy zalacznik graficzny.
|
|
|
|
Wymagane zmienne srodowiskowe (.env w katalogu projektu):
|
|
EMAIL_01_HOST - host serwera (ten sam dla SMTP i IMAP)
|
|
EMAIL_01_USERNAME - login (adres email skrzynki)
|
|
EMAIL_01_PASSWORD - haslo
|
|
EMAIL_01_IMAP_PORT - opcjonalnie, domyslnie 993 (standard IMAP SSL)
|
|
EMAIL_01_IMAP_FOLDER - opcjonalnie, domyslnie INBOX
|
|
|
|
Uzycie programowe:
|
|
from email_photo_fetcher import fetch_customer_photo
|
|
path = fetch_customer_photo("klient@example.com", "/tmp", days_back=60)
|
|
|
|
Uzycie z CLI (do testow):
|
|
python email_photo_fetcher.py --email klient@example.com --out C:/tmp
|
|
"""
|
|
|
|
import argparse
|
|
import email
|
|
import imaplib
|
|
import os
|
|
import re
|
|
import sys
|
|
from datetime import datetime, timedelta
|
|
from email.header import decode_header
|
|
|
|
|
|
IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".heic", ".webp", ".tif", ".tiff", ".bmp"}
|
|
|
|
|
|
def _project_root():
|
|
return os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
|
|
|
|
|
|
def _load_env():
|
|
"""Lekka obsluga .env (KEY=VALUE), bez zaleznosci od python-dotenv."""
|
|
env_path = os.path.join(_project_root(), ".env")
|
|
if not os.path.isfile(env_path):
|
|
return {}
|
|
out = {}
|
|
with open(env_path, "r", encoding="utf-8") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
k, _, v = line.partition("=")
|
|
v = v.strip().strip('"').strip("'")
|
|
out[k.strip()] = v
|
|
return out
|
|
|
|
|
|
def _imap_config(account_prefix="EMAIL_01"):
|
|
env = _load_env()
|
|
return {
|
|
"host": env.get(f"{account_prefix}_HOST"),
|
|
"port": int(env.get(f"{account_prefix}_IMAP_PORT", "993")),
|
|
"user": env.get(f"{account_prefix}_USERNAME"),
|
|
"password": env.get(f"{account_prefix}_PASSWORD"),
|
|
"folder": env.get(f"{account_prefix}_IMAP_FOLDER", "INBOX"),
|
|
}
|
|
|
|
|
|
def _decode_filename(raw_name):
|
|
if not raw_name:
|
|
return ""
|
|
parts = decode_header(raw_name)
|
|
out = []
|
|
for chunk, enc in parts:
|
|
if isinstance(chunk, bytes):
|
|
try:
|
|
out.append(chunk.decode(enc or "utf-8", errors="replace"))
|
|
except LookupError:
|
|
out.append(chunk.decode("utf-8", errors="replace"))
|
|
else:
|
|
out.append(chunk)
|
|
return "".join(out)
|
|
|
|
|
|
def _safe_filename(name):
|
|
name = re.sub(r"[\\/:*?\"<>|]+", "_", name).strip()
|
|
return name or "attachment"
|
|
|
|
|
|
def _is_image(filename):
|
|
ext = os.path.splitext(filename)[1].lower()
|
|
return ext in IMAGE_EXTENSIONS
|
|
|
|
|
|
def fetch_customer_photo(customer_email, output_dir, days_back=60):
|
|
"""Szuka maili od customer_email z ostatnich `days_back` dni i pobiera
|
|
pierwszy zalacznik graficzny do output_dir. Zwraca sciezke lub None."""
|
|
cfg = _imap_config()
|
|
if not cfg["host"] or not cfg["user"] or not cfg["password"]:
|
|
raise RuntimeError(
|
|
"Brak konfiguracji konta w .env — wymagane: EMAIL_01_HOST, EMAIL_01_USERNAME, EMAIL_01_PASSWORD"
|
|
)
|
|
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
imap = imaplib.IMAP4_SSL(cfg["host"], cfg["port"])
|
|
|
|
try:
|
|
imap.login(cfg["user"], cfg["password"])
|
|
imap.select(cfg["folder"])
|
|
|
|
since = (datetime.now() - timedelta(days=days_back)).strftime("%d-%b-%Y")
|
|
criteria = f'(FROM "{customer_email}" SINCE "{since}")'
|
|
status, data = imap.search(None, criteria)
|
|
if status != "OK" or not data or not data[0]:
|
|
return None
|
|
|
|
ids = data[0].split()
|
|
# Iterujemy od najnowszego do najstarszego
|
|
for msg_id in reversed(ids):
|
|
status, msg_data = imap.fetch(msg_id, "(RFC822)")
|
|
if status != "OK" or not msg_data or not msg_data[0]:
|
|
continue
|
|
msg = email.message_from_bytes(msg_data[0][1])
|
|
|
|
for part in msg.walk():
|
|
if part.get_content_maintype() == "multipart":
|
|
continue
|
|
disposition = str(part.get("Content-Disposition") or "")
|
|
filename = _decode_filename(part.get_filename())
|
|
|
|
if not filename:
|
|
continue
|
|
if "attachment" not in disposition.lower() and "inline" not in disposition.lower():
|
|
continue
|
|
if not _is_image(filename):
|
|
continue
|
|
|
|
payload = part.get_payload(decode=True)
|
|
if not payload:
|
|
continue
|
|
|
|
safe = _safe_filename(filename)
|
|
stamp = datetime.now().strftime("%Y%m%d%H%M%S")
|
|
local_name = f"{customer_email}_{stamp}_{safe}"
|
|
local_path = os.path.join(output_dir, local_name)
|
|
with open(local_path, "wb") as f:
|
|
f.write(payload)
|
|
return local_path
|
|
|
|
return None
|
|
finally:
|
|
try:
|
|
imap.logout()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Pobierz zdjecie klienta z poczty IMAP")
|
|
parser.add_argument("--email", required=True, help="Adres email klienta (nadawca)")
|
|
parser.add_argument("--out", required=True, help="Katalog wyjsciowy dla zdjecia")
|
|
parser.add_argument("--days", type=int, default=60, help="Ile dni wstecz szukac (domyslnie 60)")
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
path = fetch_customer_photo(args.email, args.out, days_back=args.days)
|
|
except Exception as exc:
|
|
print(f"BLAD: {exc}")
|
|
sys.exit(2)
|
|
|
|
if path:
|
|
print(f"OK: {path}")
|
|
else:
|
|
print("BRAK: nie znaleziono zdjecia w mailach od tego klienta")
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|