update
This commit is contained in:
169
tools/generowanie/email_message_fetcher.py
Normal file
169
tools/generowanie/email_message_fetcher.py
Normal file
@@ -0,0 +1,169 @@
|
||||
"""
|
||||
Pobieranie wiadomosci tekstowych od klientow z poczty IMAP.
|
||||
|
||||
Bliznaczy modul do `email_photo_fetcher.py`, ale zamiast zalacznikow graficznych
|
||||
zwraca tresc tekstowa wiadomosci (body) od podanego nadawcy. Uzywany przez silnik
|
||||
`/wygeneruj-projekty` aby wykryc dodatkowe poprawki/uwagi ktore klient przysylal
|
||||
mailem juz po zlozeniu zamowienia.
|
||||
|
||||
Skrzynka: EMAIL_01, folder: INBOX (na stale — zgodnie z wymaganiem).
|
||||
|
||||
Wymagane zmienne srodowiskowe (.env w katalogu projektu):
|
||||
EMAIL_01_HOST
|
||||
EMAIL_01_USERNAME
|
||||
EMAIL_01_PASSWORD
|
||||
EMAIL_01_IMAP_PORT - opcjonalnie, domyslnie 993
|
||||
|
||||
Uzycie programowe:
|
||||
from email_message_fetcher import fetch_customer_messages
|
||||
items = fetch_customer_messages("klient@example.com", days_back=60)
|
||||
# items: list[dict] z kluczami: date, subject, body
|
||||
|
||||
Uzycie z CLI:
|
||||
python email_message_fetcher.py --email klient@example.com --days 60
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import email
|
||||
import imaplib
|
||||
import json
|
||||
import sys
|
||||
from datetime import datetime, timedelta
|
||||
from email.header import decode_header
|
||||
|
||||
from email_photo_fetcher import _imap_config # reuse env loader
|
||||
|
||||
|
||||
def _decode_header_value(raw):
|
||||
if not raw:
|
||||
return ""
|
||||
parts = decode_header(raw)
|
||||
out = []
|
||||
for chunk, enc in parts:
|
||||
if isinstance(chunk, bytes):
|
||||
try:
|
||||
out.append(chunk.decode(enc or "utf-8", errors="replace"))
|
||||
except LookupError:
|
||||
out.append(chunk.decode("utf-8", errors="replace"))
|
||||
else:
|
||||
out.append(chunk)
|
||||
return "".join(out).strip()
|
||||
|
||||
|
||||
def _extract_text_body(msg):
|
||||
"""Zwraca text/plain z wiadomosci; gdy brak — fallback do text/html (bez tagow)."""
|
||||
plain_parts = []
|
||||
html_parts = []
|
||||
|
||||
for part in msg.walk():
|
||||
if part.get_content_maintype() == "multipart":
|
||||
continue
|
||||
disposition = str(part.get("Content-Disposition") or "").lower()
|
||||
if "attachment" in disposition:
|
||||
continue
|
||||
|
||||
ctype = part.get_content_type()
|
||||
payload = part.get_payload(decode=True)
|
||||
if not payload:
|
||||
continue
|
||||
charset = part.get_content_charset() or "utf-8"
|
||||
try:
|
||||
text = payload.decode(charset, errors="replace")
|
||||
except LookupError:
|
||||
text = payload.decode("utf-8", errors="replace")
|
||||
|
||||
if ctype == "text/plain":
|
||||
plain_parts.append(text)
|
||||
elif ctype == "text/html":
|
||||
html_parts.append(text)
|
||||
|
||||
if plain_parts:
|
||||
return "\n".join(p.strip() for p in plain_parts).strip()
|
||||
|
||||
if html_parts:
|
||||
import re
|
||||
raw = "\n".join(html_parts)
|
||||
raw = re.sub(r"(?is)<(script|style).*?>.*?</\1>", " ", raw)
|
||||
raw = re.sub(r"(?s)<[^>]+>", " ", raw)
|
||||
raw = re.sub(r"[ \t]+", " ", raw)
|
||||
raw = re.sub(r"\s*\n\s*", "\n", raw)
|
||||
return raw.strip()
|
||||
|
||||
return ""
|
||||
|
||||
|
||||
def fetch_customer_messages(customer_email, days_back=60):
|
||||
"""Szuka maili od `customer_email` w INBOX z ostatnich `days_back` dni.
|
||||
Zwraca liste dict {date, subject, body}, od najnowszego do najstarszego."""
|
||||
cfg = _imap_config()
|
||||
if not cfg["host"] or not cfg["user"] or not cfg["password"]:
|
||||
raise RuntimeError(
|
||||
"Brak konfiguracji konta w .env — wymagane: EMAIL_01_HOST, EMAIL_01_USERNAME, EMAIL_01_PASSWORD"
|
||||
)
|
||||
|
||||
imap = imaplib.IMAP4_SSL(cfg["host"], cfg["port"])
|
||||
results = []
|
||||
|
||||
try:
|
||||
imap.login(cfg["user"], cfg["password"])
|
||||
imap.select("INBOX")
|
||||
|
||||
since = (datetime.now() - timedelta(days=days_back)).strftime("%d-%b-%Y")
|
||||
criteria = f'(FROM "{customer_email}" SINCE "{since}")'
|
||||
status, data = imap.search(None, criteria)
|
||||
if status != "OK" or not data or not data[0]:
|
||||
return results
|
||||
|
||||
ids = data[0].split()
|
||||
for msg_id in reversed(ids):
|
||||
status, msg_data = imap.fetch(msg_id, "(RFC822)")
|
||||
if status != "OK" or not msg_data or not msg_data[0]:
|
||||
continue
|
||||
msg = email.message_from_bytes(msg_data[0][1])
|
||||
|
||||
subject = _decode_header_value(msg.get("Subject"))
|
||||
date_hdr = msg.get("Date") or ""
|
||||
body = _extract_text_body(msg)
|
||||
|
||||
if not body:
|
||||
continue
|
||||
|
||||
results.append({"date": date_hdr, "subject": subject, "body": body})
|
||||
|
||||
return results
|
||||
finally:
|
||||
try:
|
||||
imap.logout()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Pobierz wiadomosci klienta z INBOX")
|
||||
parser.add_argument("--email", required=True, help="Adres email klienta (nadawca)")
|
||||
parser.add_argument("--days", type=int, default=60, help="Ile dni wstecz szukac (domyslnie 60)")
|
||||
parser.add_argument("--json", action="store_true", help="Wyjscie w formacie JSON")
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
items = fetch_customer_messages(args.email, days_back=args.days)
|
||||
except Exception as exc:
|
||||
print(f"BLAD: {exc}")
|
||||
sys.exit(2)
|
||||
|
||||
if not items:
|
||||
print("BRAK: nie znaleziono wiadomosci od tego klienta w INBOX")
|
||||
sys.exit(1)
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(items, ensure_ascii=False, indent=2))
|
||||
else:
|
||||
for i, it in enumerate(items, 1):
|
||||
print(f"=== [{i}] {it['date']} | {it['subject']} ===")
|
||||
print(it["body"])
|
||||
print()
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user