""" Pobiera dane zamówień z Shopify Admin przez Playwright. Loguje się do panelu, przechodzi do Analytics > Reports, pobiera dane sprzedaży. Użycie: python scripts/reports/fetch_shopify_orders.py --customer laitica.pl --month 2026-03 """ import argparse import json import os import re import sys import time from datetime import datetime, timedelta from pathlib import Path from dotenv import load_dotenv from playwright.sync_api import sync_playwright, TimeoutError as PlaywrightTimeout def get_month_range(month_str: str): """Zwraca (first_day, last_day) dla danego miesiąca YYYY-MM.""" year, month = map(int, month_str.split("-")) first_day = datetime(year, month, 1) if month == 12: last_day = datetime(year + 1, 1, 1) - timedelta(days=1) else: last_day = datetime(year, month + 1, 1) - timedelta(days=1) return first_day.strftime("%Y-%m-%d"), last_day.strftime("%Y-%m-%d") def main(): parser = argparse.ArgumentParser(description="Pobierz zamówienia z Shopify Admin") parser.add_argument("--customer", required=True, help="Domena klienta") parser.add_argument("--month", required=True, help="Miesiąc YYYY-MM") parser.add_argument("--headless", action="store_true", help="Tryb headless") args = parser.parse_args() load_dotenv(Path(__file__).resolve().parents[2] / ".env") domain = args.customer admin_url = os.environ.get(f"SHOPIFY_ADMIN_URL_{domain}") login_email = os.environ.get(f"SHOPIFY_LOGIN_{domain}") login_password = os.environ.get(f"SHOPIFY_PASSWORD_{domain}") if not all([admin_url, login_email, login_password]): print(f"Brak danych logowania w .env dla {domain}") sys.exit(1) first_day, last_day = get_month_range(args.month) print(f"Shopify Admin: {admin_url}") print(f"Okres: {first_day} — {last_day}") with sync_playwright() as p: browser = p.chromium.launch(headless=args.headless) context = browser.new_context( viewport={"width": 1280, "height": 900}, locale="pl-PL", ) page = context.new_page() # --- Logowanie ręczne --- print("\n1. Otwieram stronę logowania Shopify...") print(">>> ZALOGUJ SIĘ RĘCZNIE W OKNIE PRZEGLĄDARKI <<<") print(">>> Czekam max 180 sekund na zalogowanie...\n") page.goto(admin_url, wait_until="domcontentloaded", timeout=30000) # Czekaj aż URL będzie wskazywał na zalogowany admin for i in range(180): time.sleep(1) current_url = page.url if "/store/" in current_url and "accounts.shopify.com" not in current_url and "lookup" not in current_url: print(f" Zalogowano! ({current_url})") break if i % 15 == 0 and i > 0: print(f" Czekam na logowanie... ({i}s)") else: print(" Timeout 180s — nie zalogowano.") browser.close() sys.exit(1) # Przejdź do admina sklepu print("2. Przechodzę do admina sklepu...") page.goto(f"{admin_url}/orders?status=any", wait_until="networkidle", timeout=30000) time.sleep(3) # --- Pobieranie zamówień przez URL z filtrami dat --- print(f"3. Pobieram zamówienia za {args.month}...") # Shopify Admin API endpoint przez stronę # Używamy filtrów w URL zamówień orders_url = ( f"{admin_url}/orders.json" f"?status=any" f"&created_at_min={first_day}T00:00:00" f"&created_at_max={last_day}T23:59:59" f"&limit=250" ) # Próba pobrania przez API endpoint (admin jest zalogowany) response = page.goto(orders_url, wait_until="networkidle", timeout=30000) orders_data = None if response and response.status == 200: try: body = page.locator("body").inner_text() orders_data = json.loads(body) print(f" Pobrano dane JSON: {len(orders_data.get('orders', []))} zamówień") except (json.JSONDecodeError, Exception): print(" Nie udało się sparsować JSON z orders.json") # Jeśli API nie zadziałało, pobierz z UI if not orders_data: print(" Próbuję pobrać z Analytics...") # Przejdź do raportu sprzedaży analytics_url = ( f"{admin_url}/analytics/reports/finances_summary" f"?since={first_day}&until={last_day}" ) page.goto(analytics_url, wait_until="networkidle", timeout=30000) time.sleep(5) # Screenshot dla debugowania screenshot_path = Path(__file__).parent / "output" / f"shopify_debug_{domain}.png" page.screenshot(path=str(screenshot_path), full_page=True) print(f" Screenshot: {screenshot_path}") # Próba wyciągnięcia danych z Analytics page page_text = page.inner_text("body") print(f" Tekst strony (pierwsze 2000 znaków):") print(f" {page_text[:2000]}") # Fallback — przejdź do orders z filtrem dat print("\n Fallback: liczę zamówienia z listy orders...") page.goto( f"{admin_url}/orders?inContextTimelineDate%5Bgte%5D={first_day}" f"&inContextTimelineDate%5Blte%5D={last_day}&status=any", wait_until="networkidle", timeout=30000, ) time.sleep(5) screenshot_path2 = Path(__file__).parent / "output" / f"shopify_orders_{domain}.png" page.screenshot(path=str(screenshot_path2), full_page=True) print(f" Screenshot orders: {screenshot_path2}") page_text = page.inner_text("body") print(f" Orders page text (2000 chars):") print(f" {page_text[:2000]}") # Przetwórz dane zamówień (jeśli mamy JSON) if orders_data and "orders" in orders_data: orders = orders_data["orders"] # Filtruj tylko opłacone/zrealizowane paid_orders = [ o for o in orders if o.get("financial_status") in ("paid", "partially_refunded", "refunded") or o.get("fulfillment_status") in ("fulfilled", "partial", None) ] total_revenue = sum(float(o.get("total_price", 0)) for o in paid_orders) total_orders = len(paid_orders) aov = total_revenue / total_orders if total_orders > 0 else 0 result = { "source": "shopify_admin", "month": args.month, "transactions": total_orders, "revenue": round(total_revenue, 2), "aov": round(aov, 2), "orders_detail": [ { "id": o.get("name", o.get("id")), "date": o.get("created_at", "")[:10], "total": float(o.get("total_price", 0)), "status": o.get("financial_status", ""), } for o in paid_orders ], } # Zapisz out_dir = Path(__file__).parent / "output" out_dir.mkdir(exist_ok=True) out_path = out_dir / f"shopify_{domain}_{args.month}.json" with open(out_path, "w", encoding="utf-8") as f: json.dump(result, f, indent=2, ensure_ascii=False) print(f"\n=== WYNIK ===") print(f"Zamówienia: {total_orders}") print(f"Przychód: {total_revenue:.2f} PLN") print(f"AOV: {aov:.2f} PLN") print(f"Zapisano: {out_path}") else: print("\nNie udało się automatycznie pobrać danych.") print("Sprawdź screenshoty i spróbuj ręcznie podać dane.") browser.close() if __name__ == "__main__": main()