Files
google-ads-ver-2/scripts/lib/gads_client.py
2026-05-15 09:28:11 +02:00

173 lines
6.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Wspólna biblioteka Google Ads API.
Użycie:
from lib.gads_client import get_client, get_customer_id, run_query, write_csv
get_customer_id("laitica.pl") -> "2625677205"
get_customer_id("262-567-7205") -> "2625677205"
get_customer_id("2625677205") -> "2625677205"
"""
import csv
import io
import os
import re
import sys
import threading
import time
from contextlib import contextmanager
from pathlib import Path
from google.ads.googleads.client import GoogleAdsClient
ROOT = Path(__file__).parent.parent.parent
sys.path.insert(0, str(ROOT))
from src.gads_v2.config import load_config, load_env
load_env(ROOT / ".env")
# Wymuszamy UTF-8 na stdout — raz, przy pierwszym imporcie
if not isinstance(sys.stdout, io.TextIOWrapper) or sys.stdout.encoding.lower().replace("-", "") != "utf8":
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
def get_customer_id(customer: str) -> str:
"""
Zwraca customer_id (string cyfr bez myślników).
Przyjmuje:
- domenę: "laitica.pl" -> szuka GOOGLE_ACCOUNT_ID_laiticapl
- customer_id: "262-567-7205" lub "2625677205"
"""
# Już jest numeryczny (z myślnikami lub bez)
if re.fullmatch(r"[\d-]+", customer):
return customer.replace("-", "")
try:
cfg = load_config()
if customer in cfg.clients:
return cfg.clients[customer].safe_customer_id
except Exception:
pass
# Domena -> klucz środowiska (próbuj bez znaków specjalnych, potem oryginał)
env_key = "GOOGLE_ACCOUNT_ID_" + re.sub(r"[.\-]", "", customer)
value = os.environ.get(env_key)
if not value:
env_key = "GOOGLE_ACCOUNT_ID_" + customer
value = os.environ.get(env_key)
if not value:
raise ValueError(
f"Nie znaleziono {env_key} w .env. "
f"Dostępne klucze: {[k for k in os.environ if k.startswith('GOOGLE_ACCOUNT_ID_')]}"
)
return value.replace("-", "")
def get_client(use_proto_plus: bool = True) -> GoogleAdsClient:
"""Tworzy klienta Google Ads API."""
return GoogleAdsClient.load_from_dict(
{
"developer_token": os.environ.get("GOOGLE_ADS_DEVELOPER_TOKEN") or os.environ["GOOGLE_ADS_DEVELOPER_TOKNE"],
"client_id": os.environ["GOOGLE_ADS_OAUTH2_CLIENT_ID"],
"client_secret": os.environ["GOOGLE_ADS_OAUTH2_CLIENT_SECRET"],
"refresh_token": os.environ["GOOGLE_ADS_OAUTH2_REFRESH_TOKEN"],
"login_customer_id": os.environ["GOOGLE_ADS_MANAGER_ACCOUNT_ID"],
"use_proto_plus": use_proto_plus,
}
)
def run_query(
client: GoogleAdsClient,
customer_id: str,
query: str,
timeout: float | None = 300.0,
) -> list:
"""Wykonuje zapytanie GAQL i zwraca listę wyników.
`timeout` (sekundy) jest przekazany do gRPC. Default 300s (5 min) — chroni
przed cichym wiszącym RPC. Po przekroczeniu rzuca jasny wyjątek z hintem.
Pass `timeout=None` aby wyłączyć (rzadko potrzebne — patrz `feedback_script_timeout_handling.md`).
Note: SDK Google Ads Python ma wbudowany retry policy dla unary RPC (~5 attempts × exponential backoff).
`search_stream` jako server-streaming nie korzysta z retry per-batch — timeout jest tu twardym capem.
Override SDK retry policy wymaga edycji `grpc_service_config.json` wewnątrz pakietu — niepraktyczne.
Dla agresywniejszego anti-throttling: zmniejsz `timeout` (np. 60s) i obsłuż `DeadlineExceeded` w skrypcie.
"""
service = client.get_service("GoogleAdsService")
kwargs = {"customer_id": customer_id, "query": query}
if timeout is not None:
kwargs["timeout"] = timeout
rows = []
try:
for batch in service.search_stream(**kwargs):
for row in batch.results:
rows.append(row)
except Exception as e:
# Translate gRPC DeadlineExceeded / Aborted to actionable message
msg = str(e)
if "DEADLINE_EXCEEDED" in msg or "Deadline" in msg or "deadline" in msg:
raise RuntimeError(
f"GAQL query przekroczyło timeout {timeout}s. Sugestie: "
f"(1) dodaj filtr `--campaign-id` lub `LIMIT N` w GAQL, "
f"(2) skróć zakres dat (`segments.date BETWEEN ...`), "
f"(3) podziel query na mniejsze segmenty. "
f"Original: {msg[:200]}"
) from e
raise
return rows
def write_csv(path: Path, rows: list[dict]) -> None:
"""Zapisuje listę słowników do CSV (UTF-8 BOM, Excel-friendly)."""
if not rows:
print(f" Brak danych — pomijam {path.name}")
return
with open(path, "w", newline="", encoding="utf-8-sig") as f:
writer = csv.DictWriter(f, fieldnames=rows[0].keys())
writer.writeheader()
writer.writerows(rows)
print(f" Zapisano {len(rows)} wierszy -> {path.name}")
@contextmanager
def heartbeat(label: str = "still working", interval: float = 10.0, file=sys.stderr):
"""
Context manager — pisze co `interval` sekund komunikat `[Ns] {label}...` do stderr.
Eliminuje wrażenie zawieszenia w długich skryptach (Google Ads API throttling, paginacja, retry).
Użycie:
with heartbeat("fetching ad_schedule"):
rows = run_query(client, customer_id, query)
Przerywa się automatycznie po wyjściu z bloku. Jeśli skrypt zakończy <interval s, nic nie wypisze.
"""
stop = threading.Event()
start = time.time()
def _tick():
while not stop.wait(interval):
elapsed = int(time.time() - start)
print(f" [{elapsed}s] {label}...", file=file, flush=True)
t = threading.Thread(target=_tick, daemon=True)
t.start()
try:
yield
finally:
stop.set()
t.join(timeout=0.5)
def output_dir(customer: str) -> Path:
"""Zwraca ścieżkę do folderu danych klienta, tworzy jeśli nie istnieje."""
# Próbuj znaleźć katalog po domenie
if not re.fullmatch(r"[\d-]+", customer):
d = ROOT / "clients" / customer
else:
# Dla ID szukaj folderu po wartości z .env
d = ROOT / "clients" / customer
d.mkdir(parents=True, exist_ok=True)
return d