174 lines
5.8 KiB
Python
174 lines
5.8 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import sys
|
|
from collections import Counter, defaultdict
|
|
from datetime import date
|
|
from pathlib import Path
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
sys.path.insert(0, str(ROOT))
|
|
|
|
from src.gads_v2.config import load_config, load_env
|
|
from src.gads_v2.google_ads import get_google_ads_client, run_query
|
|
|
|
|
|
CSV_HEADERS = [
|
|
"product_id",
|
|
"product_name",
|
|
"conversions",
|
|
"conversion_value",
|
|
"cost",
|
|
"roas",
|
|
"impressions",
|
|
"clicks",
|
|
]
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Eksport historycznych wynikow produktow z Google Ads API do CSV."
|
|
)
|
|
parser.add_argument("client", help="Domena klienta z config/clients.toml, np. laitica.pl")
|
|
parser.add_argument("--start", default="2000-01-01", help="Data poczatkowa YYYY-MM-DD")
|
|
parser.add_argument("--end", default=date.today().isoformat(), help="Data koncowa YYYY-MM-DD")
|
|
parser.add_argument("--output", help="Sciezka wynikowego pliku CSV")
|
|
return parser.parse_args()
|
|
|
|
|
|
def year_ranges(start: date, end: date) -> list[tuple[date, date]]:
|
|
ranges = []
|
|
current = start
|
|
while current <= end:
|
|
year_end = min(date(current.year, 12, 31), end)
|
|
ranges.append((current, year_end))
|
|
current = date(current.year + 1, 1, 1)
|
|
return ranges
|
|
|
|
|
|
def as_float(value: object) -> float:
|
|
return float(value or 0)
|
|
|
|
|
|
def as_int(value: object) -> int:
|
|
return int(value or 0)
|
|
|
|
|
|
def excel_number(value: int | float, decimals: int = 0) -> str:
|
|
if decimals <= 0:
|
|
return str(int(value or 0))
|
|
text = f"{float(value or 0):.{decimals}f}".rstrip("0").rstrip(".")
|
|
return text.replace(".", ",")
|
|
|
|
|
|
def csv_cell(value: object) -> str:
|
|
text = str(value or "")
|
|
if any(char in text for char in ['"', ";", "\r", "\n"]):
|
|
return '"' + text.replace('"', '""') + '"'
|
|
return text
|
|
|
|
|
|
def write_excel_csv(path: Path, rows: list[dict]) -> None:
|
|
lines = [";".join(CSV_HEADERS)]
|
|
for row in rows:
|
|
lines.append(
|
|
";".join(
|
|
[
|
|
csv_cell(row["product_id"]),
|
|
csv_cell(row["product_name"]),
|
|
excel_number(row["conversions"], 4),
|
|
excel_number(row["conversion_value"], 2),
|
|
excel_number(row["cost"], 2),
|
|
excel_number(row["roas"], 4),
|
|
excel_number(row["impressions"]),
|
|
excel_number(row["clicks"]),
|
|
]
|
|
)
|
|
)
|
|
path.write_text("\n".join(lines) + "\n", encoding="utf-8-sig")
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_args()
|
|
load_env(ROOT / ".env")
|
|
config = load_config()
|
|
if args.client not in config.clients:
|
|
known = ", ".join(sorted(config.clients))
|
|
raise SystemExit(f"Nie znaleziono klienta {args.client}. Dostepni klienci: {known}")
|
|
|
|
start = date.fromisoformat(args.start)
|
|
end = date.fromisoformat(args.end)
|
|
if start > end:
|
|
raise SystemExit("--start nie moze byc pozniej niz --end")
|
|
|
|
client_config = config.clients[args.client]
|
|
output = Path(args.output) if args.output else ROOT / "clients" / args.client / "data" / f"google_ads_product_sales_history_{start}_{end}.csv"
|
|
output.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
google_client = get_google_ads_client(use_proto_plus=True)
|
|
records: dict[str, dict] = {}
|
|
title_votes: dict[str, Counter[str]] = defaultdict(Counter)
|
|
|
|
for chunk_start, chunk_end in year_ranges(start, end):
|
|
print(f"Pobieram {chunk_start} - {chunk_end}...", flush=True)
|
|
query = f"""
|
|
SELECT
|
|
segments.product_item_id,
|
|
segments.product_title,
|
|
metrics.impressions,
|
|
metrics.clicks,
|
|
metrics.cost_micros,
|
|
metrics.conversions,
|
|
metrics.conversions_value
|
|
FROM shopping_performance_view
|
|
WHERE segments.date BETWEEN '{chunk_start.isoformat()}' AND '{chunk_end.isoformat()}'
|
|
"""
|
|
rows = run_query(google_client, client_config.safe_customer_id, query, timeout=300.0)
|
|
for row in rows:
|
|
product_id = str(row.segments.product_item_id or "").strip()
|
|
if not product_id:
|
|
continue
|
|
title = str(row.segments.product_title or "").strip()
|
|
record = records.setdefault(
|
|
product_id,
|
|
{
|
|
"product_id": product_id,
|
|
"product_name": title,
|
|
"impressions": 0,
|
|
"clicks": 0,
|
|
"cost": 0.0,
|
|
"conversions": 0.0,
|
|
"conversion_value": 0.0,
|
|
"roas": 0.0,
|
|
},
|
|
)
|
|
if title:
|
|
title_votes[product_id][title] += 1
|
|
record["impressions"] += as_int(row.metrics.impressions)
|
|
record["clicks"] += as_int(row.metrics.clicks)
|
|
record["cost"] += as_float(row.metrics.cost_micros) / 1_000_000
|
|
record["conversions"] += as_float(row.metrics.conversions)
|
|
record["conversion_value"] += as_float(row.metrics.conversions_value)
|
|
|
|
for product_id, record in records.items():
|
|
if title_votes[product_id]:
|
|
record["product_name"] = title_votes[product_id].most_common(1)[0][0]
|
|
cost = record["cost"]
|
|
record["cost"] = round(cost, 2)
|
|
record["conversions"] = round(record["conversions"], 4)
|
|
record["conversion_value"] = round(record["conversion_value"], 2)
|
|
record["roas"] = round(record["conversion_value"] / cost, 4) if cost else 0.0
|
|
|
|
sorted_rows = sorted(
|
|
records.values(),
|
|
key=lambda item: (item["conversion_value"], item["conversions"], item["cost"]),
|
|
reverse=True,
|
|
)
|
|
write_excel_csv(output, sorted_rows)
|
|
|
|
print(f"Zapisano {len(sorted_rows)} produktow: {output}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|