from __future__ import annotations import argparse import csv import os import sys from datetime import datetime from pathlib import Path import requests ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT)) from scripts.product_cl1_sales_summary import ( fetch_cl1_segments, find_sales_csv, read_sales, split_products_by_top, ) from src.gads_v2.config import load_config, load_env from src.gads_v2.tasks.pla_cl1_sync import fetch_adspro_products def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Ustawia custom_label_4=catch_all w adsPRO dla produktow spoza top N w kazdym CL1." ) parser.add_argument("client", help="Domena klienta, np. laitica.pl") parser.add_argument("--top-per-cl1", type=int, default=20) parser.add_argument("--value", default="catch_all") parser.add_argument("--sales-csv") parser.add_argument("--apply", action="store_true", help="Bez tej flagi zapisuje tylko plan CSV.") parser.add_argument( "--action", default="product_custom_label_4_set", help="Nazwa akcji adsPRO ustawiajacej CL4.", ) return parser.parse_args() def csv_cell(value: object) -> str: text = str(value or "") if any(char in text for char in ['"', ";", "\r", "\n"]): return '"' + text.replace('"', '""') + '"' return text def write_plan(path: Path, rows: list[dict]) -> None: headers = [ "offer_id", "title", "custom_label_1", "current_custom_label_4", "target_custom_label_4", "conversions", "conversion_value", "cost", "roas", "status", "message", ] lines = [";".join(headers)] for row in rows: lines.append(";".join(csv_cell(row.get(header, "")) for header in headers)) path.write_text("\n".join(lines) + "\n", encoding="utf-8-sig") def set_cl4(api_url: str, api_key: str, client_id: str, action: str, offer_id: str, value: str) -> dict: payload = { "action": action, "api_key": api_key, "client_id": client_id, "offer_id": offer_id, "custom_label_4": value, } response = requests.post(api_url, data=payload, timeout=30) response.raise_for_status() response.encoding = "utf-8" try: return response.json() except ValueError: return {"result": "error", "message": response.text[:500]} def main() -> None: args = parse_args() load_env(ROOT / ".env") config = load_config() if args.client not in config.clients: known = ", ".join(sorted(config.clients)) raise SystemExit(f"Nie znaleziono klienta {args.client}. Dostepni: {known}") client_config = config.clients[args.client] if not client_config.adspro_client_id: raise SystemExit(f"Brak adspro_client_id dla {args.client}.") sales_path = Path(args.sales_csv) if args.sales_csv else find_sales_csv(args.client) sales = read_sales(sales_path) segments = fetch_cl1_segments(client_config) products = fetch_adspro_products(client_config, segments) _, catch_all_products = split_products_by_top(products, sales, args.top_per_cl1) rows = [] for product in catch_all_products: offer_id = str(product.get("offer_id") or "") stats = sales.get(offer_id, {}) rows.append( { "offer_id": offer_id, "title": product.get("title", ""), "custom_label_1": product.get("custom_label_1", ""), "current_custom_label_4": product.get("custom_label_4", ""), "target_custom_label_4": args.value, "conversions": str(product.get("conversions", stats.get("conversions", 0))), "conversion_value": str(product.get("conversion_value", stats.get("conversion_value", 0))), "cost": str(product.get("cost", stats.get("cost", 0))), "roas": str(product.get("roas", stats.get("roas", 0))), "status": "planned", "message": "", } ) timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") plan_path = ROOT / "clients" / args.client / "data" / f"adspro_cl4_catch_all_plan_{timestamp}.csv" plan_path.parent.mkdir(parents=True, exist_ok=True) if args.apply: api_url = os.environ.get("ADSPRO_API_URL") api_key = os.environ.get("ADSPRO_API_KEY") if not api_url or not api_key: raise SystemExit("Brak ADSPRO_API_URL lub ADSPRO_API_KEY w .env.") for index, row in enumerate(rows, 1): result = set_cl4( api_url, api_key, client_config.adspro_client_id, args.action, row["offer_id"], args.value, ) if result.get("result") == "error": row["status"] = "error" row["message"] = result.get("message", "") print(f"{index}/{len(rows)} ERROR {row['offer_id']}: {row['message']}", flush=True) else: row["status"] = "updated" row["message"] = result.get("message", "") if index % 50 == 0 or index == len(rows): print(f"{index}/{len(rows)} zaktualizowano", flush=True) write_plan(plan_path, rows) updated = sum(1 for row in rows if row["status"] == "updated") errors = sum(1 for row in rows if row["status"] == "error") print(f"Produkty do CL4={args.value}: {len(rows)}") print(f"Zaktualizowano: {updated}") print(f"Bledy: {errors}") print(f"Raport: {plan_path}") if __name__ == "__main__": main()