first commit
This commit is contained in:
172
scripts/lib/gads_client.py
Normal file
172
scripts/lib/gads_client.py
Normal file
@@ -0,0 +1,172 @@
|
||||
"""
|
||||
Wspólna biblioteka Google Ads API.
|
||||
|
||||
Użycie:
|
||||
from lib.gads_client import get_client, get_customer_id, run_query, write_csv
|
||||
|
||||
get_customer_id("laitica.pl") -> "2625677205"
|
||||
get_customer_id("262-567-7205") -> "2625677205"
|
||||
get_customer_id("2625677205") -> "2625677205"
|
||||
"""
|
||||
|
||||
import csv
|
||||
import io
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from contextlib import contextmanager
|
||||
from pathlib import Path
|
||||
|
||||
from google.ads.googleads.client import GoogleAdsClient
|
||||
|
||||
ROOT = Path(__file__).parent.parent.parent
|
||||
sys.path.insert(0, str(ROOT))
|
||||
from src.gads_v2.config import load_config, load_env
|
||||
|
||||
load_env(ROOT / ".env")
|
||||
|
||||
# Wymuszamy UTF-8 na stdout — raz, przy pierwszym imporcie
|
||||
if not isinstance(sys.stdout, io.TextIOWrapper) or sys.stdout.encoding.lower().replace("-", "") != "utf8":
|
||||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
|
||||
|
||||
|
||||
def get_customer_id(customer: str) -> str:
|
||||
"""
|
||||
Zwraca customer_id (string cyfr bez myślników).
|
||||
Przyjmuje:
|
||||
- domenę: "laitica.pl" -> szuka GOOGLE_ACCOUNT_ID_laiticapl
|
||||
- customer_id: "262-567-7205" lub "2625677205"
|
||||
"""
|
||||
# Już jest numeryczny (z myślnikami lub bez)
|
||||
if re.fullmatch(r"[\d-]+", customer):
|
||||
return customer.replace("-", "")
|
||||
|
||||
try:
|
||||
cfg = load_config()
|
||||
if customer in cfg.clients:
|
||||
return cfg.clients[customer].safe_customer_id
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Domena -> klucz środowiska (próbuj bez znaków specjalnych, potem oryginał)
|
||||
env_key = "GOOGLE_ACCOUNT_ID_" + re.sub(r"[.\-]", "", customer)
|
||||
value = os.environ.get(env_key)
|
||||
if not value:
|
||||
env_key = "GOOGLE_ACCOUNT_ID_" + customer
|
||||
value = os.environ.get(env_key)
|
||||
if not value:
|
||||
raise ValueError(
|
||||
f"Nie znaleziono {env_key} w .env. "
|
||||
f"Dostępne klucze: {[k for k in os.environ if k.startswith('GOOGLE_ACCOUNT_ID_')]}"
|
||||
)
|
||||
return value.replace("-", "")
|
||||
|
||||
|
||||
def get_client(use_proto_plus: bool = True) -> GoogleAdsClient:
|
||||
"""Tworzy klienta Google Ads API."""
|
||||
return GoogleAdsClient.load_from_dict(
|
||||
{
|
||||
"developer_token": os.environ.get("GOOGLE_ADS_DEVELOPER_TOKEN") or os.environ["GOOGLE_ADS_DEVELOPER_TOKNE"],
|
||||
"client_id": os.environ["GOOGLE_ADS_OAUTH2_CLIENT_ID"],
|
||||
"client_secret": os.environ["GOOGLE_ADS_OAUTH2_CLIENT_SECRET"],
|
||||
"refresh_token": os.environ["GOOGLE_ADS_OAUTH2_REFRESH_TOKEN"],
|
||||
"login_customer_id": os.environ["GOOGLE_ADS_MANAGER_ACCOUNT_ID"],
|
||||
"use_proto_plus": use_proto_plus,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def run_query(
|
||||
client: GoogleAdsClient,
|
||||
customer_id: str,
|
||||
query: str,
|
||||
timeout: float | None = 300.0,
|
||||
) -> list:
|
||||
"""Wykonuje zapytanie GAQL i zwraca listę wyników.
|
||||
|
||||
`timeout` (sekundy) jest przekazany do gRPC. Default 300s (5 min) — chroni
|
||||
przed cichym wiszącym RPC. Po przekroczeniu rzuca jasny wyjątek z hintem.
|
||||
Pass `timeout=None` aby wyłączyć (rzadko potrzebne — patrz `feedback_script_timeout_handling.md`).
|
||||
|
||||
Note: SDK Google Ads Python ma wbudowany retry policy dla unary RPC (~5 attempts × exponential backoff).
|
||||
`search_stream` jako server-streaming nie korzysta z retry per-batch — timeout jest tu twardym capem.
|
||||
Override SDK retry policy wymaga edycji `grpc_service_config.json` wewnątrz pakietu — niepraktyczne.
|
||||
Dla agresywniejszego anti-throttling: zmniejsz `timeout` (np. 60s) i obsłuż `DeadlineExceeded` w skrypcie.
|
||||
"""
|
||||
service = client.get_service("GoogleAdsService")
|
||||
kwargs = {"customer_id": customer_id, "query": query}
|
||||
if timeout is not None:
|
||||
kwargs["timeout"] = timeout
|
||||
rows = []
|
||||
try:
|
||||
for batch in service.search_stream(**kwargs):
|
||||
for row in batch.results:
|
||||
rows.append(row)
|
||||
except Exception as e:
|
||||
# Translate gRPC DeadlineExceeded / Aborted to actionable message
|
||||
msg = str(e)
|
||||
if "DEADLINE_EXCEEDED" in msg or "Deadline" in msg or "deadline" in msg:
|
||||
raise RuntimeError(
|
||||
f"GAQL query przekroczyło timeout {timeout}s. Sugestie: "
|
||||
f"(1) dodaj filtr `--campaign-id` lub `LIMIT N` w GAQL, "
|
||||
f"(2) skróć zakres dat (`segments.date BETWEEN ...`), "
|
||||
f"(3) podziel query na mniejsze segmenty. "
|
||||
f"Original: {msg[:200]}"
|
||||
) from e
|
||||
raise
|
||||
return rows
|
||||
|
||||
|
||||
def write_csv(path: Path, rows: list[dict]) -> None:
|
||||
"""Zapisuje listę słowników do CSV (UTF-8 BOM, Excel-friendly)."""
|
||||
if not rows:
|
||||
print(f" Brak danych — pomijam {path.name}")
|
||||
return
|
||||
with open(path, "w", newline="", encoding="utf-8-sig") as f:
|
||||
writer = csv.DictWriter(f, fieldnames=rows[0].keys())
|
||||
writer.writeheader()
|
||||
writer.writerows(rows)
|
||||
print(f" Zapisano {len(rows)} wierszy -> {path.name}")
|
||||
|
||||
|
||||
@contextmanager
|
||||
def heartbeat(label: str = "still working", interval: float = 10.0, file=sys.stderr):
|
||||
"""
|
||||
Context manager — pisze co `interval` sekund komunikat `[Ns] {label}...` do stderr.
|
||||
Eliminuje wrażenie zawieszenia w długich skryptach (Google Ads API throttling, paginacja, retry).
|
||||
|
||||
Użycie:
|
||||
with heartbeat("fetching ad_schedule"):
|
||||
rows = run_query(client, customer_id, query)
|
||||
|
||||
Przerywa się automatycznie po wyjściu z bloku. Jeśli skrypt zakończy <interval s, nic nie wypisze.
|
||||
"""
|
||||
stop = threading.Event()
|
||||
start = time.time()
|
||||
|
||||
def _tick():
|
||||
while not stop.wait(interval):
|
||||
elapsed = int(time.time() - start)
|
||||
print(f" [{elapsed}s] {label}...", file=file, flush=True)
|
||||
|
||||
t = threading.Thread(target=_tick, daemon=True)
|
||||
t.start()
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
stop.set()
|
||||
t.join(timeout=0.5)
|
||||
|
||||
|
||||
def output_dir(customer: str) -> Path:
|
||||
"""Zwraca ścieżkę do folderu danych klienta, tworzy jeśli nie istnieje."""
|
||||
# Próbuj znaleźć katalog po domenie
|
||||
if not re.fullmatch(r"[\d-]+", customer):
|
||||
d = ROOT / "clients" / customer
|
||||
else:
|
||||
# Dla ID szukaj folderu po wartości z .env
|
||||
d = ROOT / "clients" / customer
|
||||
d.mkdir(parents=True, exist_ok=True)
|
||||
return d
|
||||
Reference in New Issue
Block a user