Files
google-ads-ver-2/scripts/product_cl1_sales_summary.py
2026-05-15 09:28:11 +02:00

275 lines
9.7 KiB
Python

from __future__ import annotations
import argparse
import csv
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT))
from src.gads_v2.config import load_config, load_env
from src.gads_v2.google_ads import get_google_ads_client, run_query
from src.gads_v2.tasks.pla_cl1_sync import fetch_adspro_products, parse_allowed_labels
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Podsumowanie produktow adsPRO wg CL1 i wynikow Google Ads."
)
parser.add_argument("client", help="Domena klienta, np. laitica.pl")
parser.add_argument(
"--sales-csv",
help="CSV z eksportem historii sprzedazy produktow Google Ads.",
)
parser.add_argument(
"--output",
help="Sciezka wynikowego CSV z podsumowaniem CL1.",
)
parser.add_argument(
"--top-per-cl1",
type=int,
default=20,
help="Ile najlepszych produktow w kazdym CL1 ma trafic do kolumny spelnia.",
)
return parser.parse_args()
def parse_number(value: str) -> float:
text = str(value or "").strip().replace("\u00a0", "").replace(" ", "")
if "," in text and "." in text:
text = text.replace(".", "").replace(",", ".")
else:
text = text.replace(",", ".")
return float(text or 0)
def csv_cell(value: object) -> str:
text = str(value or "")
if any(char in text for char in ['"', ";", "\r", "\n"]):
return '"' + text.replace('"', '""') + '"'
return text
def excel_number(value: int | float, decimals: int = 0) -> str:
if decimals <= 0:
return str(int(value or 0))
text = f"{float(value or 0):.{decimals}f}".rstrip("0").rstrip(".")
return text.replace(".", ",")
def find_sales_csv(domain: str) -> Path:
data_dir = ROOT / "clients" / domain / "data"
candidates = sorted(data_dir.glob("google_ads_product_sales_history_*.csv"), key=lambda path: path.stat().st_mtime)
if not candidates:
raise FileNotFoundError(f"Nie znaleziono CSV Google Ads w {data_dir}.")
return candidates[-1]
def read_sales(path: Path) -> dict[str, dict]:
sample = path.read_text(encoding="utf-8-sig").splitlines()[0]
delimiter = ";" if ";" in sample else ","
sales: dict[str, dict] = {}
with path.open(newline="", encoding="utf-8-sig") as handle:
reader = csv.DictReader(handle, delimiter=delimiter)
for row in reader:
product_id = (row.get("product_id") or "").strip()
if not product_id:
continue
sales[product_id] = {
"conversions": parse_number(row.get("conversions", "")),
"conversion_value": parse_number(row.get("conversion_value", "")),
"cost": parse_number(row.get("cost", "")),
"roas": parse_number(row.get("roas", "")),
}
return sales
def fetch_cl1_segments(client_config) -> list[str]:
google_client = get_google_ads_client(use_proto_plus=True)
rows = run_query(
google_client,
client_config.safe_customer_id,
"""
SELECT campaign.id, campaign.name, campaign.status
FROM campaign
WHERE campaign.name LIKE '%PLA_CL1%'
AND campaign.status = 'ENABLED'
""",
)
return sorted({label for row in rows for label in parse_allowed_labels(row.campaign.name)})
def write_summary(path: Path, rows: list[dict]) -> None:
headers = [
"cl1",
"produkty_adspro",
"produkty_z_danymi_google_ads",
"spelnia_top_produkty",
"nie_spelnia_warunku",
"konwersje_lacznie",
"wartosc_konwersji_lacznie",
"koszt_lacznie",
"roas_lacznie",
]
lines = [";".join(headers)]
for row in rows:
lines.append(
";".join(
[
csv_cell(row["cl1"]),
excel_number(row["produkty_adspro"]),
excel_number(row["produkty_z_danymi_google_ads"]),
excel_number(row["spelnia_warunek"]),
excel_number(row["nie_spelnia_warunku"]),
excel_number(row["konwersje_lacznie"], 4),
excel_number(row["wartosc_konwersji_lacznie"], 2),
excel_number(row["koszt_lacznie"], 2),
excel_number(row["roas_lacznie"], 4),
]
)
)
path.write_text("\n".join(lines) + "\n", encoding="utf-8-sig")
def split_products_by_top(products: list[dict], sales: dict[str, dict], top_per_cl1: int) -> tuple[list[dict], list[dict]]:
products_by_cl1: dict[str, list[dict]] = {}
for product in products:
cl1 = str(product.get("custom_label_1") or "(brak CL1)").strip() or "(brak CL1)"
offer_id = str(product.get("offer_id") or "").strip()
stats = sales.get(offer_id, {})
products_by_cl1.setdefault(cl1, []).append(
{
**product,
"conversions": float(stats.get("conversions", 0.0)),
"conversion_value": float(stats.get("conversion_value", 0.0)),
"cost": float(stats.get("cost", 0.0)),
"roas": float(stats.get("roas", 0.0)),
}
)
top_products = []
catch_all_products = []
for product_rows in products_by_cl1.values():
ranked = sorted(
product_rows,
key=lambda item: (
item["conversions"],
item["conversion_value"],
item["roas"],
-item["cost"],
),
reverse=True,
)
top_products.extend(ranked[:top_per_cl1])
catch_all_products.extend(ranked[top_per_cl1:])
return top_products, catch_all_products
def print_table(rows: list[dict]) -> None:
headers = ["CL1", "Produkty", "Z danymi", "Spełnia", "Nie spełnia", "ROAS łącznie"]
table_rows = [
[
row["cl1"],
str(row["produkty_adspro"]),
str(row["produkty_z_danymi_google_ads"]),
str(row["spelnia_warunek"]),
str(row["nie_spelnia_warunku"]),
excel_number(row["roas_lacznie"], 2),
]
for row in rows
]
widths = [
max(len(str(item)) for item in [header] + [row[index] for row in table_rows])
for index, header in enumerate(headers)
]
border = "+" + "+".join("-" * (width + 2) for width in widths) + "+"
sep = "+" + "+".join("-" * (width + 2) for width in widths) + "+"
bottom = "+" + "+".join("-" * (width + 2) for width in widths) + "+"
print(border)
print("| " + " | ".join(header.ljust(widths[index]) for index, header in enumerate(headers)) + " |")
print(sep)
for row in table_rows:
print("| " + " | ".join(cell.ljust(widths[index]) for index, cell in enumerate(row)) + " |")
print(bottom)
def main() -> None:
args = parse_args()
load_env(ROOT / ".env")
config = load_config()
if args.client not in config.clients:
known = ", ".join(sorted(config.clients))
raise SystemExit(f"Nie znaleziono klienta {args.client}. Dostepni: {known}")
client_config = config.clients[args.client]
sales_path = Path(args.sales_csv) if args.sales_csv else find_sales_csv(args.client)
output_path = (
Path(args.output)
if args.output
else ROOT / "clients" / args.client / "data" / "google_ads_product_sales_by_cl1_summary.csv"
)
output_path.parent.mkdir(parents=True, exist_ok=True)
sales = read_sales(sales_path)
segments = fetch_cl1_segments(client_config)
if not segments:
raise SystemExit("Nie znaleziono aktywnych kampanii [PLA_CL1], z ktorych mozna odczytac CL1.")
print("CL1 z aktywnych kampanii PLA_CL1: " + ", ".join(segments))
products = fetch_adspro_products(client_config, segments)
summary: dict[str, dict] = {}
top_products, catch_all_products = split_products_by_top(products, sales, args.top_per_cl1)
catch_all_ids = {product["offer_id"] for product in catch_all_products}
for product in products:
cl1 = str(product.get("custom_label_1") or "(brak CL1)").strip() or "(brak CL1)"
offer_id = str(product.get("offer_id") or "").strip()
stats = sales.get(offer_id, {})
row = summary.setdefault(
cl1,
{
"cl1": cl1,
"produkty_adspro": 0,
"produkty_z_danymi_google_ads": 0,
"spelnia_warunek": 0,
"nie_spelnia_warunku": 0,
"konwersje_lacznie": 0.0,
"wartosc_konwersji_lacznie": 0.0,
"koszt_lacznie": 0.0,
"roas_lacznie": 0.0,
},
)
conversions = float(stats.get("conversions", 0.0))
conversion_value = float(stats.get("conversion_value", 0.0))
cost = float(stats.get("cost", 0.0))
roas = float(stats.get("roas", 0.0))
row["produkty_adspro"] += 1
if offer_id in sales:
row["produkty_z_danymi_google_ads"] += 1
if offer_id in catch_all_ids:
row["nie_spelnia_warunku"] += 1
else:
row["spelnia_warunek"] += 1
row["konwersje_lacznie"] += conversions
row["wartosc_konwersji_lacznie"] += conversion_value
row["koszt_lacznie"] += cost
rows = sorted(summary.values(), key=lambda item: item["spelnia_warunek"], reverse=True)
for row in rows:
row["roas_lacznie"] = (
row["wartosc_konwersji_lacznie"] / row["koszt_lacznie"]
if row["koszt_lacznie"]
else 0.0
)
write_summary(output_path, rows)
print_table(rows)
print(f"\nCSV: {output_path}")
print(f"Produkty adsPRO: {sum(row['produkty_adspro'] for row in rows)}")
print(f"Plik sprzedazy Google Ads: {sales_path}")
if __name__ == "__main__":
main()