""" Wspólna biblioteka Google Ads API. Użycie: from lib.gads_client import get_client, get_customer_id, run_query, write_csv get_customer_id("laitica.pl") -> "2625677205" get_customer_id("262-567-7205") -> "2625677205" get_customer_id("2625677205") -> "2625677205" """ import csv import io import os import re import sys import threading import time from contextlib import contextmanager from pathlib import Path from google.ads.googleads.client import GoogleAdsClient ROOT = Path(__file__).parent.parent.parent sys.path.insert(0, str(ROOT)) from src.gads_v2.config import load_config, load_env load_env(ROOT / ".env") # Wymuszamy UTF-8 na stdout — raz, przy pierwszym imporcie if not isinstance(sys.stdout, io.TextIOWrapper) or sys.stdout.encoding.lower().replace("-", "") != "utf8": sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace") def get_customer_id(customer: str) -> str: """ Zwraca customer_id (string cyfr bez myślników). Przyjmuje: - domenę: "laitica.pl" -> szuka GOOGLE_ACCOUNT_ID_laiticapl - customer_id: "262-567-7205" lub "2625677205" """ # Już jest numeryczny (z myślnikami lub bez) if re.fullmatch(r"[\d-]+", customer): return customer.replace("-", "") try: cfg = load_config() if customer in cfg.clients: return cfg.clients[customer].safe_customer_id except Exception: pass # Domena -> klucz środowiska (próbuj bez znaków specjalnych, potem oryginał) env_key = "GOOGLE_ACCOUNT_ID_" + re.sub(r"[.\-]", "", customer) value = os.environ.get(env_key) if not value: env_key = "GOOGLE_ACCOUNT_ID_" + customer value = os.environ.get(env_key) if not value: raise ValueError( f"Nie znaleziono {env_key} w .env. " f"Dostępne klucze: {[k for k in os.environ if k.startswith('GOOGLE_ACCOUNT_ID_')]}" ) return value.replace("-", "") def get_client(use_proto_plus: bool = True) -> GoogleAdsClient: """Tworzy klienta Google Ads API.""" return GoogleAdsClient.load_from_dict( { "developer_token": os.environ.get("GOOGLE_ADS_DEVELOPER_TOKEN") or os.environ["GOOGLE_ADS_DEVELOPER_TOKNE"], "client_id": os.environ["GOOGLE_ADS_OAUTH2_CLIENT_ID"], "client_secret": os.environ["GOOGLE_ADS_OAUTH2_CLIENT_SECRET"], "refresh_token": os.environ["GOOGLE_ADS_OAUTH2_REFRESH_TOKEN"], "login_customer_id": os.environ["GOOGLE_ADS_MANAGER_ACCOUNT_ID"], "use_proto_plus": use_proto_plus, } ) def run_query( client: GoogleAdsClient, customer_id: str, query: str, timeout: float | None = 300.0, ) -> list: """Wykonuje zapytanie GAQL i zwraca listę wyników. `timeout` (sekundy) jest przekazany do gRPC. Default 300s (5 min) — chroni przed cichym wiszącym RPC. Po przekroczeniu rzuca jasny wyjątek z hintem. Pass `timeout=None` aby wyłączyć (rzadko potrzebne — patrz `feedback_script_timeout_handling.md`). Note: SDK Google Ads Python ma wbudowany retry policy dla unary RPC (~5 attempts × exponential backoff). `search_stream` jako server-streaming nie korzysta z retry per-batch — timeout jest tu twardym capem. Override SDK retry policy wymaga edycji `grpc_service_config.json` wewnątrz pakietu — niepraktyczne. Dla agresywniejszego anti-throttling: zmniejsz `timeout` (np. 60s) i obsłuż `DeadlineExceeded` w skrypcie. """ service = client.get_service("GoogleAdsService") kwargs = {"customer_id": customer_id, "query": query} if timeout is not None: kwargs["timeout"] = timeout rows = [] try: for batch in service.search_stream(**kwargs): for row in batch.results: rows.append(row) except Exception as e: # Translate gRPC DeadlineExceeded / Aborted to actionable message msg = str(e) if "DEADLINE_EXCEEDED" in msg or "Deadline" in msg or "deadline" in msg: raise RuntimeError( f"GAQL query przekroczyło timeout {timeout}s. Sugestie: " f"(1) dodaj filtr `--campaign-id` lub `LIMIT N` w GAQL, " f"(2) skróć zakres dat (`segments.date BETWEEN ...`), " f"(3) podziel query na mniejsze segmenty. " f"Original: {msg[:200]}" ) from e raise return rows def write_csv(path: Path, rows: list[dict]) -> None: """Zapisuje listę słowników do CSV (UTF-8 BOM, Excel-friendly).""" if not rows: print(f" Brak danych — pomijam {path.name}") return with open(path, "w", newline="", encoding="utf-8-sig") as f: writer = csv.DictWriter(f, fieldnames=rows[0].keys()) writer.writeheader() writer.writerows(rows) print(f" Zapisano {len(rows)} wierszy -> {path.name}") @contextmanager def heartbeat(label: str = "still working", interval: float = 10.0, file=sys.stderr): """ Context manager — pisze co `interval` sekund komunikat `[Ns] {label}...` do stderr. Eliminuje wrażenie zawieszenia w długich skryptach (Google Ads API throttling, paginacja, retry). Użycie: with heartbeat("fetching ad_schedule"): rows = run_query(client, customer_id, query) Przerywa się automatycznie po wyjściu z bloku. Jeśli skrypt zakończy Path: """Zwraca ścieżkę do folderu danych klienta, tworzy jeśli nie istnieje.""" # Próbuj znaleźć katalog po domenie if not re.fullmatch(r"[\d-]+", customer): d = ROOT / "clients" / customer else: # Dla ID szukaj folderu po wartości z .env d = ROOT / "clients" / customer d.mkdir(parents=True, exist_ok=True) return d