from __future__ import annotations import argparse import sys from collections import Counter, defaultdict from datetime import date from pathlib import Path ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT)) from src.gads_v2.config import load_config, load_env from src.gads_v2.google_ads import get_google_ads_client, run_query CSV_HEADERS = [ "product_id", "product_name", "conversions", "conversion_value", "cost", "roas", "impressions", "clicks", ] def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Eksport historycznych wynikow produktow z Google Ads API do CSV." ) parser.add_argument("client", help="Domena klienta z config/clients.toml, np. laitica.pl") parser.add_argument("--start", default="2000-01-01", help="Data poczatkowa YYYY-MM-DD") parser.add_argument("--end", default=date.today().isoformat(), help="Data koncowa YYYY-MM-DD") parser.add_argument("--output", help="Sciezka wynikowego pliku CSV") return parser.parse_args() def year_ranges(start: date, end: date) -> list[tuple[date, date]]: ranges = [] current = start while current <= end: year_end = min(date(current.year, 12, 31), end) ranges.append((current, year_end)) current = date(current.year + 1, 1, 1) return ranges def as_float(value: object) -> float: return float(value or 0) def as_int(value: object) -> int: return int(value or 0) def excel_number(value: int | float, decimals: int = 0) -> str: if decimals <= 0: return str(int(value or 0)) text = f"{float(value or 0):.{decimals}f}".rstrip("0").rstrip(".") return text.replace(".", ",") def csv_cell(value: object) -> str: text = str(value or "") if any(char in text for char in ['"', ";", "\r", "\n"]): return '"' + text.replace('"', '""') + '"' return text def write_excel_csv(path: Path, rows: list[dict]) -> None: lines = [";".join(CSV_HEADERS)] for row in rows: lines.append( ";".join( [ csv_cell(row["product_id"]), csv_cell(row["product_name"]), excel_number(row["conversions"], 4), excel_number(row["conversion_value"], 2), excel_number(row["cost"], 2), excel_number(row["roas"], 4), excel_number(row["impressions"]), excel_number(row["clicks"]), ] ) ) path.write_text("\n".join(lines) + "\n", encoding="utf-8-sig") def main() -> None: args = parse_args() load_env(ROOT / ".env") config = load_config() if args.client not in config.clients: known = ", ".join(sorted(config.clients)) raise SystemExit(f"Nie znaleziono klienta {args.client}. Dostepni klienci: {known}") start = date.fromisoformat(args.start) end = date.fromisoformat(args.end) if start > end: raise SystemExit("--start nie moze byc pozniej niz --end") client_config = config.clients[args.client] output = Path(args.output) if args.output else ROOT / "clients" / args.client / "data" / f"google_ads_product_sales_history_{start}_{end}.csv" output.parent.mkdir(parents=True, exist_ok=True) google_client = get_google_ads_client(use_proto_plus=True) records: dict[str, dict] = {} title_votes: dict[str, Counter[str]] = defaultdict(Counter) for chunk_start, chunk_end in year_ranges(start, end): print(f"Pobieram {chunk_start} - {chunk_end}...", flush=True) query = f""" SELECT segments.product_item_id, segments.product_title, metrics.impressions, metrics.clicks, metrics.cost_micros, metrics.conversions, metrics.conversions_value FROM shopping_performance_view WHERE segments.date BETWEEN '{chunk_start.isoformat()}' AND '{chunk_end.isoformat()}' """ rows = run_query(google_client, client_config.safe_customer_id, query, timeout=300.0) for row in rows: product_id = str(row.segments.product_item_id or "").strip() if not product_id: continue title = str(row.segments.product_title or "").strip() record = records.setdefault( product_id, { "product_id": product_id, "product_name": title, "impressions": 0, "clicks": 0, "cost": 0.0, "conversions": 0.0, "conversion_value": 0.0, "roas": 0.0, }, ) if title: title_votes[product_id][title] += 1 record["impressions"] += as_int(row.metrics.impressions) record["clicks"] += as_int(row.metrics.clicks) record["cost"] += as_float(row.metrics.cost_micros) / 1_000_000 record["conversions"] += as_float(row.metrics.conversions) record["conversion_value"] += as_float(row.metrics.conversions_value) for product_id, record in records.items(): if title_votes[product_id]: record["product_name"] = title_votes[product_id].most_common(1)[0][0] cost = record["cost"] record["cost"] = round(cost, 2) record["conversions"] = round(record["conversions"], 4) record["conversion_value"] = round(record["conversion_value"], 2) record["roas"] = round(record["conversion_value"] / cost, 4) if cost else 0.0 sorted_rows = sorted( records.values(), key=lambda item: (item["conversion_value"], item["conversions"], item["cost"]), reverse=True, ) write_excel_csv(output, sorted_rows) print(f"Zapisano {len(sorted_rows)} produktow: {output}") if __name__ == "__main__": main()