173 lines
6.1 KiB
Python
173 lines
6.1 KiB
Python
"""
|
||
Wspólna biblioteka Google Ads API.
|
||
|
||
Użycie:
|
||
from lib.gads_client import get_client, get_customer_id, run_query, write_csv
|
||
|
||
get_customer_id("laitica.pl") -> "2625677205"
|
||
get_customer_id("262-567-7205") -> "2625677205"
|
||
get_customer_id("2625677205") -> "2625677205"
|
||
"""
|
||
|
||
import csv
|
||
import io
|
||
import os
|
||
import re
|
||
import sys
|
||
import threading
|
||
import time
|
||
from contextlib import contextmanager
|
||
from pathlib import Path
|
||
|
||
from google.ads.googleads.client import GoogleAdsClient
|
||
|
||
ROOT = Path(__file__).parent.parent.parent
|
||
sys.path.insert(0, str(ROOT))
|
||
from src.gads_v2.config import load_config, load_env
|
||
|
||
load_env(ROOT / ".env")
|
||
|
||
# Wymuszamy UTF-8 na stdout — raz, przy pierwszym imporcie
|
||
if not isinstance(sys.stdout, io.TextIOWrapper) or sys.stdout.encoding.lower().replace("-", "") != "utf8":
|
||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
|
||
|
||
|
||
def get_customer_id(customer: str) -> str:
|
||
"""
|
||
Zwraca customer_id (string cyfr bez myślników).
|
||
Przyjmuje:
|
||
- domenę: "laitica.pl" -> szuka GOOGLE_ACCOUNT_ID_laiticapl
|
||
- customer_id: "262-567-7205" lub "2625677205"
|
||
"""
|
||
# Już jest numeryczny (z myślnikami lub bez)
|
||
if re.fullmatch(r"[\d-]+", customer):
|
||
return customer.replace("-", "")
|
||
|
||
try:
|
||
cfg = load_config()
|
||
if customer in cfg.clients:
|
||
return cfg.clients[customer].safe_customer_id
|
||
except Exception:
|
||
pass
|
||
|
||
# Domena -> klucz środowiska (próbuj bez znaków specjalnych, potem oryginał)
|
||
env_key = "GOOGLE_ACCOUNT_ID_" + re.sub(r"[.\-]", "", customer)
|
||
value = os.environ.get(env_key)
|
||
if not value:
|
||
env_key = "GOOGLE_ACCOUNT_ID_" + customer
|
||
value = os.environ.get(env_key)
|
||
if not value:
|
||
raise ValueError(
|
||
f"Nie znaleziono {env_key} w .env. "
|
||
f"Dostępne klucze: {[k for k in os.environ if k.startswith('GOOGLE_ACCOUNT_ID_')]}"
|
||
)
|
||
return value.replace("-", "")
|
||
|
||
|
||
def get_client(use_proto_plus: bool = True) -> GoogleAdsClient:
|
||
"""Tworzy klienta Google Ads API."""
|
||
return GoogleAdsClient.load_from_dict(
|
||
{
|
||
"developer_token": os.environ.get("GOOGLE_ADS_DEVELOPER_TOKEN") or os.environ["GOOGLE_ADS_DEVELOPER_TOKNE"],
|
||
"client_id": os.environ["GOOGLE_ADS_OAUTH2_CLIENT_ID"],
|
||
"client_secret": os.environ["GOOGLE_ADS_OAUTH2_CLIENT_SECRET"],
|
||
"refresh_token": os.environ["GOOGLE_ADS_OAUTH2_REFRESH_TOKEN"],
|
||
"login_customer_id": os.environ["GOOGLE_ADS_MANAGER_ACCOUNT_ID"],
|
||
"use_proto_plus": use_proto_plus,
|
||
}
|
||
)
|
||
|
||
|
||
def run_query(
|
||
client: GoogleAdsClient,
|
||
customer_id: str,
|
||
query: str,
|
||
timeout: float | None = 300.0,
|
||
) -> list:
|
||
"""Wykonuje zapytanie GAQL i zwraca listę wyników.
|
||
|
||
`timeout` (sekundy) jest przekazany do gRPC. Default 300s (5 min) — chroni
|
||
przed cichym wiszącym RPC. Po przekroczeniu rzuca jasny wyjątek z hintem.
|
||
Pass `timeout=None` aby wyłączyć (rzadko potrzebne — patrz `feedback_script_timeout_handling.md`).
|
||
|
||
Note: SDK Google Ads Python ma wbudowany retry policy dla unary RPC (~5 attempts × exponential backoff).
|
||
`search_stream` jako server-streaming nie korzysta z retry per-batch — timeout jest tu twardym capem.
|
||
Override SDK retry policy wymaga edycji `grpc_service_config.json` wewnątrz pakietu — niepraktyczne.
|
||
Dla agresywniejszego anti-throttling: zmniejsz `timeout` (np. 60s) i obsłuż `DeadlineExceeded` w skrypcie.
|
||
"""
|
||
service = client.get_service("GoogleAdsService")
|
||
kwargs = {"customer_id": customer_id, "query": query}
|
||
if timeout is not None:
|
||
kwargs["timeout"] = timeout
|
||
rows = []
|
||
try:
|
||
for batch in service.search_stream(**kwargs):
|
||
for row in batch.results:
|
||
rows.append(row)
|
||
except Exception as e:
|
||
# Translate gRPC DeadlineExceeded / Aborted to actionable message
|
||
msg = str(e)
|
||
if "DEADLINE_EXCEEDED" in msg or "Deadline" in msg or "deadline" in msg:
|
||
raise RuntimeError(
|
||
f"GAQL query przekroczyło timeout {timeout}s. Sugestie: "
|
||
f"(1) dodaj filtr `--campaign-id` lub `LIMIT N` w GAQL, "
|
||
f"(2) skróć zakres dat (`segments.date BETWEEN ...`), "
|
||
f"(3) podziel query na mniejsze segmenty. "
|
||
f"Original: {msg[:200]}"
|
||
) from e
|
||
raise
|
||
return rows
|
||
|
||
|
||
def write_csv(path: Path, rows: list[dict]) -> None:
|
||
"""Zapisuje listę słowników do CSV (UTF-8 BOM, Excel-friendly)."""
|
||
if not rows:
|
||
print(f" Brak danych — pomijam {path.name}")
|
||
return
|
||
with open(path, "w", newline="", encoding="utf-8-sig") as f:
|
||
writer = csv.DictWriter(f, fieldnames=rows[0].keys())
|
||
writer.writeheader()
|
||
writer.writerows(rows)
|
||
print(f" Zapisano {len(rows)} wierszy -> {path.name}")
|
||
|
||
|
||
@contextmanager
|
||
def heartbeat(label: str = "still working", interval: float = 10.0, file=sys.stderr):
|
||
"""
|
||
Context manager — pisze co `interval` sekund komunikat `[Ns] {label}...` do stderr.
|
||
Eliminuje wrażenie zawieszenia w długich skryptach (Google Ads API throttling, paginacja, retry).
|
||
|
||
Użycie:
|
||
with heartbeat("fetching ad_schedule"):
|
||
rows = run_query(client, customer_id, query)
|
||
|
||
Przerywa się automatycznie po wyjściu z bloku. Jeśli skrypt zakończy <interval s, nic nie wypisze.
|
||
"""
|
||
stop = threading.Event()
|
||
start = time.time()
|
||
|
||
def _tick():
|
||
while not stop.wait(interval):
|
||
elapsed = int(time.time() - start)
|
||
print(f" [{elapsed}s] {label}...", file=file, flush=True)
|
||
|
||
t = threading.Thread(target=_tick, daemon=True)
|
||
t.start()
|
||
try:
|
||
yield
|
||
finally:
|
||
stop.set()
|
||
t.join(timeout=0.5)
|
||
|
||
|
||
def output_dir(customer: str) -> Path:
|
||
"""Zwraca ścieżkę do folderu danych klienta, tworzy jeśli nie istnieje."""
|
||
# Próbuj znaleźć katalog po domenie
|
||
if not re.fullmatch(r"[\d-]+", customer):
|
||
d = ROOT / "clients" / customer
|
||
else:
|
||
# Dla ID szukaj folderu po wartości z .env
|
||
d = ROOT / "clients" / customer
|
||
d.mkdir(parents=True, exist_ok=True)
|
||
return d
|