170 lines
5.3 KiB
Python
170 lines
5.3 KiB
Python
"""
|
|
Pobieranie wiadomosci tekstowych od klientow z poczty IMAP.
|
|
|
|
Bliznaczy modul do `email_photo_fetcher.py`, ale zamiast zalacznikow graficznych
|
|
zwraca tresc tekstowa wiadomosci (body) od podanego nadawcy. Uzywany przez silnik
|
|
`/wygeneruj-projekty` aby wykryc dodatkowe poprawki/uwagi ktore klient przysylal
|
|
mailem juz po zlozeniu zamowienia.
|
|
|
|
Skrzynka: EMAIL_01, folder: INBOX (na stale — zgodnie z wymaganiem).
|
|
|
|
Wymagane zmienne srodowiskowe (.env w katalogu projektu):
|
|
EMAIL_01_HOST
|
|
EMAIL_01_USERNAME
|
|
EMAIL_01_PASSWORD
|
|
EMAIL_01_IMAP_PORT - opcjonalnie, domyslnie 993
|
|
|
|
Uzycie programowe:
|
|
from email_message_fetcher import fetch_customer_messages
|
|
items = fetch_customer_messages("klient@example.com", days_back=60)
|
|
# items: list[dict] z kluczami: date, subject, body
|
|
|
|
Uzycie z CLI:
|
|
python email_message_fetcher.py --email klient@example.com --days 60
|
|
"""
|
|
|
|
import argparse
|
|
import email
|
|
import imaplib
|
|
import json
|
|
import sys
|
|
from datetime import datetime, timedelta
|
|
from email.header import decode_header
|
|
|
|
from email_photo_fetcher import _imap_config # reuse env loader
|
|
|
|
|
|
def _decode_header_value(raw):
|
|
if not raw:
|
|
return ""
|
|
parts = decode_header(raw)
|
|
out = []
|
|
for chunk, enc in parts:
|
|
if isinstance(chunk, bytes):
|
|
try:
|
|
out.append(chunk.decode(enc or "utf-8", errors="replace"))
|
|
except LookupError:
|
|
out.append(chunk.decode("utf-8", errors="replace"))
|
|
else:
|
|
out.append(chunk)
|
|
return "".join(out).strip()
|
|
|
|
|
|
def _extract_text_body(msg):
|
|
"""Zwraca text/plain z wiadomosci; gdy brak — fallback do text/html (bez tagow)."""
|
|
plain_parts = []
|
|
html_parts = []
|
|
|
|
for part in msg.walk():
|
|
if part.get_content_maintype() == "multipart":
|
|
continue
|
|
disposition = str(part.get("Content-Disposition") or "").lower()
|
|
if "attachment" in disposition:
|
|
continue
|
|
|
|
ctype = part.get_content_type()
|
|
payload = part.get_payload(decode=True)
|
|
if not payload:
|
|
continue
|
|
charset = part.get_content_charset() or "utf-8"
|
|
try:
|
|
text = payload.decode(charset, errors="replace")
|
|
except LookupError:
|
|
text = payload.decode("utf-8", errors="replace")
|
|
|
|
if ctype == "text/plain":
|
|
plain_parts.append(text)
|
|
elif ctype == "text/html":
|
|
html_parts.append(text)
|
|
|
|
if plain_parts:
|
|
return "\n".join(p.strip() for p in plain_parts).strip()
|
|
|
|
if html_parts:
|
|
import re
|
|
raw = "\n".join(html_parts)
|
|
raw = re.sub(r"(?is)<(script|style).*?>.*?</\1>", " ", raw)
|
|
raw = re.sub(r"(?s)<[^>]+>", " ", raw)
|
|
raw = re.sub(r"[ \t]+", " ", raw)
|
|
raw = re.sub(r"\s*\n\s*", "\n", raw)
|
|
return raw.strip()
|
|
|
|
return ""
|
|
|
|
|
|
def fetch_customer_messages(customer_email, days_back=60):
|
|
"""Szuka maili od `customer_email` w INBOX z ostatnich `days_back` dni.
|
|
Zwraca liste dict {date, subject, body}, od najnowszego do najstarszego."""
|
|
cfg = _imap_config()
|
|
if not cfg["host"] or not cfg["user"] or not cfg["password"]:
|
|
raise RuntimeError(
|
|
"Brak konfiguracji konta w .env — wymagane: EMAIL_01_HOST, EMAIL_01_USERNAME, EMAIL_01_PASSWORD"
|
|
)
|
|
|
|
imap = imaplib.IMAP4_SSL(cfg["host"], cfg["port"])
|
|
results = []
|
|
|
|
try:
|
|
imap.login(cfg["user"], cfg["password"])
|
|
imap.select("INBOX")
|
|
|
|
since = (datetime.now() - timedelta(days=days_back)).strftime("%d-%b-%Y")
|
|
criteria = f'(FROM "{customer_email}" SINCE "{since}")'
|
|
status, data = imap.search(None, criteria)
|
|
if status != "OK" or not data or not data[0]:
|
|
return results
|
|
|
|
ids = data[0].split()
|
|
for msg_id in reversed(ids):
|
|
status, msg_data = imap.fetch(msg_id, "(RFC822)")
|
|
if status != "OK" or not msg_data or not msg_data[0]:
|
|
continue
|
|
msg = email.message_from_bytes(msg_data[0][1])
|
|
|
|
subject = _decode_header_value(msg.get("Subject"))
|
|
date_hdr = msg.get("Date") or ""
|
|
body = _extract_text_body(msg)
|
|
|
|
if not body:
|
|
continue
|
|
|
|
results.append({"date": date_hdr, "subject": subject, "body": body})
|
|
|
|
return results
|
|
finally:
|
|
try:
|
|
imap.logout()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Pobierz wiadomosci klienta z INBOX")
|
|
parser.add_argument("--email", required=True, help="Adres email klienta (nadawca)")
|
|
parser.add_argument("--days", type=int, default=60, help="Ile dni wstecz szukac (domyslnie 60)")
|
|
parser.add_argument("--json", action="store_true", help="Wyjscie w formacie JSON")
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
items = fetch_customer_messages(args.email, days_back=args.days)
|
|
except Exception as exc:
|
|
print(f"BLAD: {exc}")
|
|
sys.exit(2)
|
|
|
|
if not items:
|
|
print("BRAK: nie znaleziono wiadomosci od tego klienta w INBOX")
|
|
sys.exit(1)
|
|
|
|
if args.json:
|
|
print(json.dumps(items, ensure_ascii=False, indent=2))
|
|
else:
|
|
for i, it in enumerate(items, 1):
|
|
print(f"=== [{i}] {it['date']} | {it['subject']} ===")
|
|
print(it["body"])
|
|
print()
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|