from __future__ import annotations import argparse import csv import sys from pathlib import Path ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT)) from src.gads_v2.config import load_config, load_env from src.gads_v2.google_ads import get_google_ads_client, run_query from src.gads_v2.tasks.pla_cl1_sync import fetch_adspro_products, parse_allowed_labels def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Podsumowanie produktow adsPRO wg CL1 i wynikow Google Ads." ) parser.add_argument("client", help="Domena klienta, np. laitica.pl") parser.add_argument( "--sales-csv", help="CSV z eksportem historii sprzedazy produktow Google Ads.", ) parser.add_argument( "--output", help="Sciezka wynikowego CSV z podsumowaniem CL1.", ) parser.add_argument( "--top-per-cl1", type=int, default=20, help="Ile najlepszych produktow w kazdym CL1 ma trafic do kolumny spelnia.", ) return parser.parse_args() def parse_number(value: str) -> float: text = str(value or "").strip().replace("\u00a0", "").replace(" ", "") if "," in text and "." in text: text = text.replace(".", "").replace(",", ".") else: text = text.replace(",", ".") return float(text or 0) def csv_cell(value: object) -> str: text = str(value or "") if any(char in text for char in ['"', ";", "\r", "\n"]): return '"' + text.replace('"', '""') + '"' return text def excel_number(value: int | float, decimals: int = 0) -> str: if decimals <= 0: return str(int(value or 0)) text = f"{float(value or 0):.{decimals}f}".rstrip("0").rstrip(".") return text.replace(".", ",") def find_sales_csv(domain: str) -> Path: data_dir = ROOT / "clients" / domain / "data" candidates = sorted(data_dir.glob("google_ads_product_sales_history_*.csv"), key=lambda path: path.stat().st_mtime) if not candidates: raise FileNotFoundError(f"Nie znaleziono CSV Google Ads w {data_dir}.") return candidates[-1] def read_sales(path: Path) -> dict[str, dict]: sample = path.read_text(encoding="utf-8-sig").splitlines()[0] delimiter = ";" if ";" in sample else "," sales: dict[str, dict] = {} with path.open(newline="", encoding="utf-8-sig") as handle: reader = csv.DictReader(handle, delimiter=delimiter) for row in reader: product_id = (row.get("product_id") or "").strip() if not product_id: continue sales[product_id] = { "conversions": parse_number(row.get("conversions", "")), "conversion_value": parse_number(row.get("conversion_value", "")), "cost": parse_number(row.get("cost", "")), "roas": parse_number(row.get("roas", "")), } return sales def fetch_cl1_segments(client_config) -> list[str]: google_client = get_google_ads_client(use_proto_plus=True) rows = run_query( google_client, client_config.safe_customer_id, """ SELECT campaign.id, campaign.name, campaign.status FROM campaign WHERE campaign.name LIKE '%PLA_CL1%' AND campaign.status = 'ENABLED' """, ) return sorted({label for row in rows for label in parse_allowed_labels(row.campaign.name)}) def write_summary(path: Path, rows: list[dict]) -> None: headers = [ "cl1", "produkty_adspro", "produkty_z_danymi_google_ads", "spelnia_top_produkty", "nie_spelnia_warunku", "konwersje_lacznie", "wartosc_konwersji_lacznie", "koszt_lacznie", "roas_lacznie", ] lines = [";".join(headers)] for row in rows: lines.append( ";".join( [ csv_cell(row["cl1"]), excel_number(row["produkty_adspro"]), excel_number(row["produkty_z_danymi_google_ads"]), excel_number(row["spelnia_warunek"]), excel_number(row["nie_spelnia_warunku"]), excel_number(row["konwersje_lacznie"], 4), excel_number(row["wartosc_konwersji_lacznie"], 2), excel_number(row["koszt_lacznie"], 2), excel_number(row["roas_lacznie"], 4), ] ) ) path.write_text("\n".join(lines) + "\n", encoding="utf-8-sig") def split_products_by_top(products: list[dict], sales: dict[str, dict], top_per_cl1: int) -> tuple[list[dict], list[dict]]: products_by_cl1: dict[str, list[dict]] = {} for product in products: cl1 = str(product.get("custom_label_1") or "(brak CL1)").strip() or "(brak CL1)" offer_id = str(product.get("offer_id") or "").strip() stats = sales.get(offer_id, {}) products_by_cl1.setdefault(cl1, []).append( { **product, "conversions": float(stats.get("conversions", 0.0)), "conversion_value": float(stats.get("conversion_value", 0.0)), "cost": float(stats.get("cost", 0.0)), "roas": float(stats.get("roas", 0.0)), } ) top_products = [] catch_all_products = [] for product_rows in products_by_cl1.values(): ranked = sorted( product_rows, key=lambda item: ( item["conversions"], item["conversion_value"], item["roas"], -item["cost"], ), reverse=True, ) top_products.extend(ranked[:top_per_cl1]) catch_all_products.extend(ranked[top_per_cl1:]) return top_products, catch_all_products def print_table(rows: list[dict]) -> None: headers = ["CL1", "Produkty", "Z danymi", "Spełnia", "Nie spełnia", "ROAS łącznie"] table_rows = [ [ row["cl1"], str(row["produkty_adspro"]), str(row["produkty_z_danymi_google_ads"]), str(row["spelnia_warunek"]), str(row["nie_spelnia_warunku"]), excel_number(row["roas_lacznie"], 2), ] for row in rows ] widths = [ max(len(str(item)) for item in [header] + [row[index] for row in table_rows]) for index, header in enumerate(headers) ] border = "+" + "+".join("-" * (width + 2) for width in widths) + "+" sep = "+" + "+".join("-" * (width + 2) for width in widths) + "+" bottom = "+" + "+".join("-" * (width + 2) for width in widths) + "+" print(border) print("| " + " | ".join(header.ljust(widths[index]) for index, header in enumerate(headers)) + " |") print(sep) for row in table_rows: print("| " + " | ".join(cell.ljust(widths[index]) for index, cell in enumerate(row)) + " |") print(bottom) def main() -> None: args = parse_args() load_env(ROOT / ".env") config = load_config() if args.client not in config.clients: known = ", ".join(sorted(config.clients)) raise SystemExit(f"Nie znaleziono klienta {args.client}. Dostepni: {known}") client_config = config.clients[args.client] sales_path = Path(args.sales_csv) if args.sales_csv else find_sales_csv(args.client) output_path = ( Path(args.output) if args.output else ROOT / "clients" / args.client / "data" / "google_ads_product_sales_by_cl1_summary.csv" ) output_path.parent.mkdir(parents=True, exist_ok=True) sales = read_sales(sales_path) segments = fetch_cl1_segments(client_config) if not segments: raise SystemExit("Nie znaleziono aktywnych kampanii [PLA_CL1], z ktorych mozna odczytac CL1.") print("CL1 z aktywnych kampanii PLA_CL1: " + ", ".join(segments)) products = fetch_adspro_products(client_config, segments) summary: dict[str, dict] = {} top_products, catch_all_products = split_products_by_top(products, sales, args.top_per_cl1) catch_all_ids = {product["offer_id"] for product in catch_all_products} for product in products: cl1 = str(product.get("custom_label_1") or "(brak CL1)").strip() or "(brak CL1)" offer_id = str(product.get("offer_id") or "").strip() stats = sales.get(offer_id, {}) row = summary.setdefault( cl1, { "cl1": cl1, "produkty_adspro": 0, "produkty_z_danymi_google_ads": 0, "spelnia_warunek": 0, "nie_spelnia_warunku": 0, "konwersje_lacznie": 0.0, "wartosc_konwersji_lacznie": 0.0, "koszt_lacznie": 0.0, "roas_lacznie": 0.0, }, ) conversions = float(stats.get("conversions", 0.0)) conversion_value = float(stats.get("conversion_value", 0.0)) cost = float(stats.get("cost", 0.0)) roas = float(stats.get("roas", 0.0)) row["produkty_adspro"] += 1 if offer_id in sales: row["produkty_z_danymi_google_ads"] += 1 if offer_id in catch_all_ids: row["nie_spelnia_warunku"] += 1 else: row["spelnia_warunek"] += 1 row["konwersje_lacznie"] += conversions row["wartosc_konwersji_lacznie"] += conversion_value row["koszt_lacznie"] += cost rows = sorted(summary.values(), key=lambda item: item["spelnia_warunek"], reverse=True) for row in rows: row["roas_lacznie"] = ( row["wartosc_konwersji_lacznie"] / row["koszt_lacznie"] if row["koszt_lacznie"] else 0.0 ) write_summary(output_path, rows) print_table(rows) print(f"\nCSV: {output_path}") print(f"Produkty adsPRO: {sum(row['produkty_adspro'] for row in rows)}") print(f"Plik sprzedazy Google Ads: {sales_path}") if __name__ == "__main__": main()