Files
google-ads-ver-2/scripts/export_product_sales_history.py
2026-05-15 09:28:11 +02:00

174 lines
5.8 KiB
Python

from __future__ import annotations
import argparse
import sys
from collections import Counter, defaultdict
from datetime import date
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT))
from src.gads_v2.config import load_config, load_env
from src.gads_v2.google_ads import get_google_ads_client, run_query
CSV_HEADERS = [
"product_id",
"product_name",
"conversions",
"conversion_value",
"cost",
"roas",
"impressions",
"clicks",
]
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Eksport historycznych wynikow produktow z Google Ads API do CSV."
)
parser.add_argument("client", help="Domena klienta z config/clients.toml, np. laitica.pl")
parser.add_argument("--start", default="2000-01-01", help="Data poczatkowa YYYY-MM-DD")
parser.add_argument("--end", default=date.today().isoformat(), help="Data koncowa YYYY-MM-DD")
parser.add_argument("--output", help="Sciezka wynikowego pliku CSV")
return parser.parse_args()
def year_ranges(start: date, end: date) -> list[tuple[date, date]]:
ranges = []
current = start
while current <= end:
year_end = min(date(current.year, 12, 31), end)
ranges.append((current, year_end))
current = date(current.year + 1, 1, 1)
return ranges
def as_float(value: object) -> float:
return float(value or 0)
def as_int(value: object) -> int:
return int(value or 0)
def excel_number(value: int | float, decimals: int = 0) -> str:
if decimals <= 0:
return str(int(value or 0))
text = f"{float(value or 0):.{decimals}f}".rstrip("0").rstrip(".")
return text.replace(".", ",")
def csv_cell(value: object) -> str:
text = str(value or "")
if any(char in text for char in ['"', ";", "\r", "\n"]):
return '"' + text.replace('"', '""') + '"'
return text
def write_excel_csv(path: Path, rows: list[dict]) -> None:
lines = [";".join(CSV_HEADERS)]
for row in rows:
lines.append(
";".join(
[
csv_cell(row["product_id"]),
csv_cell(row["product_name"]),
excel_number(row["conversions"], 4),
excel_number(row["conversion_value"], 2),
excel_number(row["cost"], 2),
excel_number(row["roas"], 4),
excel_number(row["impressions"]),
excel_number(row["clicks"]),
]
)
)
path.write_text("\n".join(lines) + "\n", encoding="utf-8-sig")
def main() -> None:
args = parse_args()
load_env(ROOT / ".env")
config = load_config()
if args.client not in config.clients:
known = ", ".join(sorted(config.clients))
raise SystemExit(f"Nie znaleziono klienta {args.client}. Dostepni klienci: {known}")
start = date.fromisoformat(args.start)
end = date.fromisoformat(args.end)
if start > end:
raise SystemExit("--start nie moze byc pozniej niz --end")
client_config = config.clients[args.client]
output = Path(args.output) if args.output else ROOT / "clients" / args.client / "data" / f"google_ads_product_sales_history_{start}_{end}.csv"
output.parent.mkdir(parents=True, exist_ok=True)
google_client = get_google_ads_client(use_proto_plus=True)
records: dict[str, dict] = {}
title_votes: dict[str, Counter[str]] = defaultdict(Counter)
for chunk_start, chunk_end in year_ranges(start, end):
print(f"Pobieram {chunk_start} - {chunk_end}...", flush=True)
query = f"""
SELECT
segments.product_item_id,
segments.product_title,
metrics.impressions,
metrics.clicks,
metrics.cost_micros,
metrics.conversions,
metrics.conversions_value
FROM shopping_performance_view
WHERE segments.date BETWEEN '{chunk_start.isoformat()}' AND '{chunk_end.isoformat()}'
"""
rows = run_query(google_client, client_config.safe_customer_id, query, timeout=300.0)
for row in rows:
product_id = str(row.segments.product_item_id or "").strip()
if not product_id:
continue
title = str(row.segments.product_title or "").strip()
record = records.setdefault(
product_id,
{
"product_id": product_id,
"product_name": title,
"impressions": 0,
"clicks": 0,
"cost": 0.0,
"conversions": 0.0,
"conversion_value": 0.0,
"roas": 0.0,
},
)
if title:
title_votes[product_id][title] += 1
record["impressions"] += as_int(row.metrics.impressions)
record["clicks"] += as_int(row.metrics.clicks)
record["cost"] += as_float(row.metrics.cost_micros) / 1_000_000
record["conversions"] += as_float(row.metrics.conversions)
record["conversion_value"] += as_float(row.metrics.conversions_value)
for product_id, record in records.items():
if title_votes[product_id]:
record["product_name"] = title_votes[product_id].most_common(1)[0][0]
cost = record["cost"]
record["cost"] = round(cost, 2)
record["conversions"] = round(record["conversions"], 4)
record["conversion_value"] = round(record["conversion_value"], 2)
record["roas"] = round(record["conversion_value"] / cost, 4) if cost else 0.0
sorted_rows = sorted(
records.values(),
key=lambda item: (item["conversion_value"], item["conversions"], item["cost"]),
reverse=True,
)
write_excel_csv(output, sorted_rows)
print(f"Zapisano {len(sorted_rows)} produktow: {output}")
if __name__ == "__main__":
main()