This commit is contained in:
2026-05-16 16:31:40 +02:00
parent 75b9434de5
commit 173d734d04
29 changed files with 37539 additions and 271 deletions

View File

@@ -48,6 +48,7 @@ from .task_catalog import (
)
from .tasks.pla_settings_check import run_check_pla_settings
from .tasks.pla_cl1_sync import run_sync_pla_cl1
from .tasks.product_cl1_bestseller_split import run_split_pla_cl1_bestsellers
from .tasks.shopping_troas_ag_optimization import run_optimize_shopping_troas_ag
from .tasks.account_anomaly_check import run_check_account_anomalies
from .tasks.ad_asset_status_check import run_check_ad_asset_statuses
@@ -143,6 +144,7 @@ def main() -> None:
"--task",
choices=[
"sync_pla_cl1",
"split_pla_cl1_bestsellers",
"optimize_shopping_troas_ag",
"check_pla_settings",
"optimize_product_feed",
@@ -1337,6 +1339,16 @@ def run_task(
show_navigation=show_navigation,
)
return
if task_id == "split_pla_cl1_bestsellers":
run_split_pla_cl1_bestsellers(
client,
global_rules,
plan_only=plan_only,
apply_plan_path=apply_plan_path,
confirm_apply=confirm_apply,
show_navigation=show_navigation,
)
return
if task_id == "optimize_shopping_troas_ag":
run_optimize_shopping_troas_ag(
client,

View File

@@ -0,0 +1,790 @@
from __future__ import annotations
import json
import math
import os
from collections import Counter, defaultdict
from dataclasses import dataclass
from datetime import timedelta
from pathlib import Path
from typing import Any
import requests
from ..config import ClientConfig, client_dir
from ..google_ads import get_google_ads_client, run_query
from ..history import append_change_markdown, append_history, now_local
from ..table import print_table
from .pla_cl1_sync import (
fetch_adspro_products,
normalize_variant,
parse_allowed_labels,
parse_campaign_variant,
)
TASK_ID = "split_pla_cl1_bestsellers"
TASK_NAME = "Podzial produktow CL1 na bestsellery i catch_all"
DEFAULT_SETTINGS = {
"metric_days": 180,
"recent_days": 30,
"core_ratio": 0.20,
"min_core_products": 20,
"max_core_products": 80,
"min_catch_all_products": 10,
"rank_buffer": 3,
"catch_all_label": "catch_all",
"min_cost_for_roas_score": 20.0,
"min_cost_without_conversion": 50.0,
"max_cost_penalty": 25.0,
"protected_cl4_labels": ["paused"],
}
@dataclass
class ProductCl1BestsellerSplitPlan:
metric_window: dict
recent_window: dict
settings: dict
campaigns: list[dict]
segment_summaries: list[dict]
changes: list[dict]
warnings: list[str]
def to_dict(self) -> dict:
return {
"task": TASK_ID,
"task_name": TASK_NAME,
"metric_window": self.metric_window,
"recent_window": self.recent_window,
"settings": self.settings,
"campaigns": self.campaigns,
"segment_summaries": self.segment_summaries,
"changes": self.changes,
"warnings": self.warnings,
}
@classmethod
def from_dict(cls, data: dict) -> "ProductCl1BestsellerSplitPlan":
return cls(
metric_window=data.get("metric_window", {}),
recent_window=data.get("recent_window", {}),
settings=data.get("settings", {}),
campaigns=data.get("campaigns", []),
segment_summaries=data.get("segment_summaries", []),
changes=data.get("changes", []),
warnings=data.get("warnings", []),
)
def safe_int(value: Any, default: int = 0) -> int:
try:
return int(value)
except (TypeError, ValueError):
return default
def safe_float(value: Any, default: float = 0.0) -> float:
try:
return float(value)
except (TypeError, ValueError):
return default
def format_decimal(value: Any, digits: int = 2) -> str:
return f"{safe_float(value):.{digits}f}"
def split_settings(client_config: ClientConfig, global_rules: dict) -> dict:
settings = dict(DEFAULT_SETTINGS)
rules = client_config.effective_rules(global_rules, TASK_ID)
for key in DEFAULT_SETTINGS:
if key in rules:
settings[key] = rules[key]
settings["metric_days"] = max(30, safe_int(settings["metric_days"], 180))
settings["recent_days"] = max(7, safe_int(settings["recent_days"], 30))
settings["core_ratio"] = min(max(safe_float(settings["core_ratio"], 0.20), 0.05), 0.80)
settings["min_core_products"] = max(1, safe_int(settings["min_core_products"], 20))
settings["max_core_products"] = max(settings["min_core_products"], safe_int(settings["max_core_products"], 80))
settings["min_catch_all_products"] = max(1, safe_int(settings["min_catch_all_products"], 10))
settings["rank_buffer"] = max(0, safe_int(settings["rank_buffer"], 3))
settings["catch_all_label"] = normalize_variant(str(settings["catch_all_label"] or "catch_all"))
settings["min_cost_for_roas_score"] = max(0.0, safe_float(settings["min_cost_for_roas_score"], 20.0))
settings["min_cost_without_conversion"] = max(0.0, safe_float(settings["min_cost_without_conversion"], 50.0))
settings["max_cost_penalty"] = max(0.0, safe_float(settings["max_cost_penalty"], 25.0))
protected = settings.get("protected_cl4_labels") or ["paused"]
if not isinstance(protected, list):
protected = ["paused"]
settings["protected_cl4_labels"] = sorted({normalize_variant(str(item)) for item in protected if str(item).strip()})
return settings
def adspro_credentials(client_config: ClientConfig) -> tuple[str, str, str]:
api_url = os.environ.get("ADSPRO_API_URL")
api_key = os.environ.get("ADSPRO_API_KEY")
if not api_url or not api_key:
raise RuntimeError("Brak ADSPRO_API_URL lub ADSPRO_API_KEY w .env.")
if not client_config.adspro_client_id:
raise RuntimeError(f"Brak adspro_client_id dla {client_config.domain} w config/clients.toml.")
return api_url, api_key, client_config.adspro_client_id
def set_adspro_cl4(
api_url: str,
api_key: str,
adspro_client_id: str,
offer_id: str,
value: str,
) -> dict:
response = requests.post(
api_url,
data={
"action": "product_custom_label_4_set",
"api_key": api_key,
"client_id": adspro_client_id,
"offer_id": offer_id,
"custom_label_4": value,
},
timeout=30,
)
response.raise_for_status()
response.encoding = "utf-8"
try:
data = response.json()
except ValueError:
data = {"result": "error", "message": response.text[:500]}
if data.get("result") == "error":
raise RuntimeError(data.get("message") or "adsPRO zwrocil blad.")
return data
def fetch_pla_cl1_campaigns(client_config: ClientConfig) -> list[dict]:
google_client = get_google_ads_client(use_proto_plus=True)
rows = run_query(
google_client,
client_config.safe_customer_id,
"""
SELECT campaign.id, campaign.name, campaign.status
FROM campaign
WHERE campaign.name LIKE '%PLA_CL1%'
AND campaign.status = 'ENABLED'
""",
)
campaigns = []
for row in rows:
campaigns.append(
{
"id": str(row.campaign.id),
"name": row.campaign.name,
"status": row.campaign.status.name,
"allowed": sorted(parse_allowed_labels(row.campaign.name)),
"variant": parse_campaign_variant(row.campaign.name),
}
)
return campaigns
def product_metrics_row() -> dict:
return {
"impressions": 0,
"clicks": 0,
"cost": 0.0,
"conversions": 0.0,
"conversion_value": 0.0,
"roas": 0.0,
}
def fetch_product_metrics(client_config: ClientConfig, start_date: str, end_date: str) -> dict[str, dict]:
google_client = get_google_ads_client(use_proto_plus=True)
rows = run_query(
google_client,
client_config.safe_customer_id,
f"""
SELECT
segments.product_item_id,
metrics.impressions,
metrics.clicks,
metrics.cost_micros,
metrics.conversions,
metrics.conversions_value
FROM shopping_performance_view
WHERE segments.date BETWEEN '{start_date}' AND '{end_date}'
""",
timeout=300.0,
)
metrics: dict[str, dict] = {}
for row in rows:
offer_id = str(row.segments.product_item_id or "").strip()
if not offer_id:
continue
record = metrics.setdefault(offer_id, product_metrics_row())
record["impressions"] += safe_int(row.metrics.impressions)
record["clicks"] += safe_int(row.metrics.clicks)
record["cost"] += safe_float(row.metrics.cost_micros) / 1_000_000
record["conversions"] += safe_float(row.metrics.conversions)
record["conversion_value"] += safe_float(row.metrics.conversions_value)
for record in metrics.values():
record["cost"] = round(record["cost"], 2)
record["conversions"] = round(record["conversions"], 4)
record["conversion_value"] = round(record["conversion_value"], 2)
record["roas"] = round(record["conversion_value"] / record["cost"], 4) if record["cost"] else 0.0
return metrics
def score_percentiles(rows: list[dict], metric_key: str, score_key: str) -> None:
positive_values = sorted({safe_float(row.get(metric_key)) for row in rows if safe_float(row.get(metric_key)) > 0})
if not positive_values:
for row in rows:
row[score_key] = 0.0
return
if len(positive_values) == 1:
only_value = positive_values[0]
for row in rows:
row[score_key] = 100.0 if safe_float(row.get(metric_key)) == only_value else 0.0
return
ranks = {value: index for index, value in enumerate(positive_values)}
max_rank = len(positive_values) - 1
for row in rows:
value = safe_float(row.get(metric_key))
row[score_key] = round(100.0 * ranks[value] / max_rank, 2) if value > 0 else 0.0
def add_scores(rows: list[dict], settings: dict) -> None:
max_cost = max((safe_float(row.get("cost_180d")) for row in rows), default=0.0)
for row in rows:
if safe_float(row.get("cost_180d")) >= settings["min_cost_for_roas_score"]:
row["roas_score_input"] = safe_float(row.get("roas_180d"))
else:
row["roas_score_input"] = 0.0
score_percentiles(rows, "conversion_value_180d", "conversion_value_180d_score")
score_percentiles(rows, "conversions_180d", "conversions_180d_score")
score_percentiles(rows, "conversion_value_30d", "conversion_value_30d_score")
score_percentiles(rows, "roas_score_input", "roas_180d_score")
for row in rows:
penalty = 0.0
if (
safe_float(row.get("conversions_180d")) <= 0
and safe_float(row.get("cost_180d")) >= settings["min_cost_without_conversion"]
and max_cost > 0
):
penalty = min(
settings["max_cost_penalty"],
10.0 + (settings["max_cost_penalty"] - 10.0) * safe_float(row.get("cost_180d")) / max_cost,
)
row["cost_without_conversion_penalty"] = round(penalty, 2)
row["score"] = round(
0.40 * row["conversion_value_180d_score"]
+ 0.25 * row["conversions_180d_score"]
+ 0.20 * row["conversion_value_30d_score"]
+ 0.15 * row["roas_180d_score"]
- penalty,
2,
)
def core_count_for_segment(products_count: int, settings: dict) -> int:
by_ratio = math.ceil(products_count * settings["core_ratio"])
count = max(settings["min_core_products"], by_ratio)
count = min(settings["max_core_products"], count)
return min(products_count, count)
def has_any_data(row: dict) -> bool:
return any(
safe_float(row.get(key)) > 0
for key in (
"impressions_180d",
"clicks_180d",
"cost_180d",
"conversions_180d",
"conversion_value_180d",
)
)
def has_sales_signal(row: dict) -> bool:
return safe_float(row.get("conversions_180d")) > 0 or safe_float(row.get("conversion_value_180d")) > 0
def build_rank_rows(
products: list[dict],
metrics_180d: dict[str, dict],
metrics_30d: dict[str, dict],
settings: dict,
) -> tuple[dict[str, list[dict]], Counter]:
catch_all = settings["catch_all_label"]
protected = set(settings["protected_cl4_labels"])
allowed = {"", catch_all}
skipped: Counter = Counter()
by_cl1: dict[str, list[dict]] = defaultdict(list)
for product in products:
offer_id = str(product.get("offer_id") or "").strip()
cl1 = str(product.get("custom_label_1") or "").strip()
current_cl4 = str(product.get("custom_label_4") or "").strip()
current_norm = normalize_variant(current_cl4)
if not offer_id or not cl1:
skipped["missing_offer_or_cl1"] += 1
continue
if current_norm in protected:
skipped["protected_cl4"] += 1
continue
if current_norm not in allowed:
skipped["other_cl4"] += 1
continue
m180 = metrics_180d.get(offer_id, product_metrics_row())
m30 = metrics_30d.get(offer_id, product_metrics_row())
row = {
"offer_id": offer_id,
"title": product.get("title", "") or "",
"custom_label_1": cl1,
"current_custom_label_4": current_cl4,
"current_custom_label_4_normalized": current_norm,
"impressions_180d": safe_int(m180.get("impressions")),
"clicks_180d": safe_int(m180.get("clicks")),
"cost_180d": round(safe_float(m180.get("cost")), 2),
"conversions_180d": round(safe_float(m180.get("conversions")), 4),
"conversion_value_180d": round(safe_float(m180.get("conversion_value")), 2),
"roas_180d": round(safe_float(m180.get("roas")), 4),
"impressions_30d": safe_int(m30.get("impressions")),
"clicks_30d": safe_int(m30.get("clicks")),
"cost_30d": round(safe_float(m30.get("cost")), 2),
"conversions_30d": round(safe_float(m30.get("conversions")), 4),
"conversion_value_30d": round(safe_float(m30.get("conversion_value")), 2),
"roas_30d": round(safe_float(m30.get("roas")), 4),
}
by_cl1[cl1].append(row)
return by_cl1, skipped
def campaign_variants_by_cl1(campaigns: list[dict]) -> dict[str, set[str]]:
variants: dict[str, set[str]] = defaultdict(set)
for campaign in campaigns:
for label in campaign.get("allowed", []):
variants[label].add(normalize_variant(campaign.get("variant", "")))
return variants
def ranked_sort_key(row: dict) -> tuple:
current_priority = 0 if row.get("current_custom_label_4_normalized") == "" else 1
return (
-safe_float(row.get("score")),
-safe_float(row.get("conversion_value_180d")),
-safe_float(row.get("conversions_180d")),
-safe_float(row.get("conversion_value_30d")),
-safe_float(row.get("roas_180d")),
current_priority,
str(row.get("title", "")).lower(),
str(row.get("offer_id", "")),
)
def build_segment_plan(
cl1: str,
rows: list[dict],
variants: set[str],
settings: dict,
) -> tuple[dict, list[dict]]:
catch_all = settings["catch_all_label"]
summary = {
"custom_label_1": cl1,
"status": "planned",
"reason": "",
"managed_products": len(rows),
"products_with_any_data": sum(1 for row in rows if has_any_data(row)),
"products_with_sales_signal": sum(1 for row in rows if has_sales_signal(row)),
"core_rank_target": 0,
"rank_buffer": settings["rank_buffer"],
"final_core_products": 0,
"final_catch_all_products": 0,
"planned_set_catch_all": 0,
"planned_clear_catch_all": 0,
}
if "" not in variants or catch_all not in variants:
summary["status"] = "skipped"
summary["reason"] = "brak aktywnej kampanii bazowej albo wariantu catch_all dla tego CL1"
return summary, []
core_count = core_count_for_segment(len(rows), settings)
catch_all_count = len(rows) - core_count
summary["core_rank_target"] = core_count
if len(rows) < settings["min_core_products"] + settings["min_catch_all_products"]:
summary["status"] = "skipped"
summary["reason"] = "za malo produktow do bezpiecznego podzialu"
return summary, []
if catch_all_count < settings["min_catch_all_products"]:
summary["status"] = "skipped"
summary["reason"] = "po podziale wariant catch_all bylby za maly"
return summary, []
if summary["products_with_sales_signal"] <= 0:
summary["status"] = "skipped"
summary["reason"] = "brak sprzedazy albo wartosci konwersji w oknie 180 dni"
return summary, []
add_scores(rows, settings)
ranked = sorted(rows, key=ranked_sort_key)
changes = []
final_core = 0
final_catch_all = 0
buffer = settings["rank_buffer"]
max_final_core = len(rows) - settings["min_catch_all_products"]
core_promotion_limit = min(max_final_core, max(settings["min_core_products"], core_count - buffer))
catch_all_move_after = min(max_final_core, core_count + buffer)
for rank, row in enumerate(ranked, 1):
current = row["current_custom_label_4_normalized"]
if current == catch_all:
target = "" if rank <= core_promotion_limit else catch_all
reason = f"produkt wraca do glownej kampanii: rank {rank} miesci sie w stabilnym rdzeniu TOP {core_promotion_limit}"
else:
target = catch_all if rank > catch_all_move_after else ""
reason = f"produkt przechodzi do catch_all: rank {rank} poza stabilnym progiem TOP {catch_all_move_after}"
if target == "":
final_core += 1
else:
final_catch_all += 1
if target == current:
continue
change = {
"offer_id": row["offer_id"],
"title": row["title"],
"custom_label_1": cl1,
"rank": rank,
"score": row["score"],
"current_custom_label_4": row["current_custom_label_4"],
"target_custom_label_4": target,
"action": "clear_catch_all" if target == "" else "set_catch_all",
"reason": reason,
"conversions_180d": row["conversions_180d"],
"conversion_value_180d": row["conversion_value_180d"],
"cost_180d": row["cost_180d"],
"roas_180d": row["roas_180d"],
"conversion_value_30d": row["conversion_value_30d"],
"cost_without_conversion_penalty": row["cost_without_conversion_penalty"],
}
changes.append(change)
summary["final_core_products"] = final_core
summary["final_catch_all_products"] = final_catch_all
summary["planned_set_catch_all"] = sum(1 for row in changes if row["action"] == "set_catch_all")
summary["planned_clear_catch_all"] = sum(1 for row in changes if row["action"] == "clear_catch_all")
return summary, changes
def build_product_cl1_bestseller_split_plan(
client_config: ClientConfig,
global_rules: dict,
) -> ProductCl1BestsellerSplitPlan:
settings = split_settings(client_config, global_rules)
ended_at = now_local().date() - timedelta(days=1)
metric_started_at = ended_at - timedelta(days=settings["metric_days"] - 1)
recent_started_at = ended_at - timedelta(days=settings["recent_days"] - 1)
metric_window = {"start": metric_started_at.isoformat(), "end": ended_at.isoformat(), "days": settings["metric_days"]}
recent_window = {"start": recent_started_at.isoformat(), "end": ended_at.isoformat(), "days": settings["recent_days"]}
campaigns = fetch_pla_cl1_campaigns(client_config)
variants_by_cl1 = campaign_variants_by_cl1(campaigns)
segments = sorted(variants_by_cl1)
warnings: list[str] = []
if not segments:
warnings.append("Nie znaleziono aktywnych kampanii PLA_CL1 z ktorych mozna odczytac CL1.")
return ProductCl1BestsellerSplitPlan(metric_window, recent_window, settings, campaigns, [], [], warnings)
metrics_180d = fetch_product_metrics(client_config, metric_window["start"], metric_window["end"])
metrics_30d = fetch_product_metrics(client_config, recent_window["start"], recent_window["end"])
products = fetch_adspro_products(client_config, segments)
by_cl1, skipped = build_rank_rows(products, metrics_180d, metrics_30d, settings)
if skipped["protected_cl4"]:
warnings.append(f"Pominieto produkty z chronionym CL4: {skipped['protected_cl4']}.")
if skipped["other_cl4"]:
warnings.append(f"Pominieto produkty z innym CL4 niz puste/catch_all: {skipped['other_cl4']}.")
if skipped["missing_offer_or_cl1"]:
warnings.append(f"Pominieto produkty bez offer_id albo CL1: {skipped['missing_offer_or_cl1']}.")
segment_summaries = []
changes = []
for cl1 in segments:
summary, segment_changes = build_segment_plan(cl1, by_cl1.get(cl1, []), variants_by_cl1.get(cl1, set()), settings)
segment_summaries.append(summary)
changes.extend(segment_changes)
changes.sort(key=lambda item: (item["custom_label_1"], item["rank"], item["offer_id"]))
return ProductCl1BestsellerSplitPlan(
metric_window=metric_window,
recent_window=recent_window,
settings=settings,
campaigns=campaigns,
segment_summaries=segment_summaries,
changes=changes,
warnings=warnings,
)
def save_plan(domain: str, plan: ProductCl1BestsellerSplitPlan) -> tuple[Path, Path]:
ts = now_local()
base = client_dir(domain) / "plans"
base.mkdir(parents=True, exist_ok=True)
stem = f"{ts.strftime('%Y-%m-%d_%H-%M-%S')}_{TASK_ID}"
json_path = base / f"{stem}.json"
md_path = base / f"{stem}.md"
payload = {"created_at": ts.isoformat(timespec="seconds"), "client": domain, **plan.to_dict()}
json_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
lines = [
f"# Plan: {TASK_NAME}",
"",
f"Klient: {domain}",
f"Utworzono: {ts.isoformat(timespec='seconds')}",
f"Dane glowne: {plan.metric_window.get('start')} - {plan.metric_window.get('end')}",
f"Dane swiezosci: {plan.recent_window.get('start')} - {plan.recent_window.get('end')}",
"",
"## Podsumowanie",
"",
f"- Segmenty CL1: {len(plan.segment_summaries)}",
f"- Zmiany CL4 do wdrozenia: {len(plan.changes)}",
f"- Do catch_all: {sum(1 for row in plan.changes if row.get('action') == 'set_catch_all')}",
f"- Do glownej kampanii: {sum(1 for row in plan.changes if row.get('action') == 'clear_catch_all')}",
"",
]
if plan.warnings:
lines.extend(["## Uwagi", ""])
lines.extend(f"- {warning}" for warning in plan.warnings)
lines.append("")
if plan.segment_summaries:
lines.extend(
[
"## Podsumowanie po CL1",
"",
"| CL1 | Status | Produkty | Rdzen rankingu | Docelowo glowna | Docelowo catch_all | Do catch_all | Do glownej | Powod |",
"| --- | --- | ---: | ---: | ---: | ---: | ---: | ---: | --- |",
]
)
for row in plan.segment_summaries:
lines.append(
f"| {row.get('custom_label_1', '')} | {row.get('status', '')} | "
f"{row.get('managed_products', 0)} | {row.get('core_rank_target', 0)} | "
f"{row.get('final_core_products', 0)} | {row.get('final_catch_all_products', 0)} | "
f"{row.get('planned_set_catch_all', 0)} | {row.get('planned_clear_catch_all', 0)} | "
f"{row.get('reason', '')} |"
)
lines.append("")
if plan.changes:
lines.extend(
[
"## Zmiany do wdrozenia",
"",
"| CL1 | Rank | Produkt | Obecne CL4 | Docelowe CL4 | Score | Wartosc 180d | Koszt 180d | ROAS 180d | Powod |",
"| --- | ---: | --- | --- | --- | ---: | ---: | ---: | ---: | --- |",
]
)
for row in plan.changes:
lines.append(
f"| {row.get('custom_label_1', '')} | {row.get('rank', '')} | "
f"{row.get('offer_id', '')} | {row.get('current_custom_label_4', '')} | "
f"{row.get('target_custom_label_4', '')} | {format_decimal(row.get('score'))} | "
f"{format_decimal(row.get('conversion_value_180d'))} | {format_decimal(row.get('cost_180d'))} | "
f"{format_decimal(row.get('roas_180d'), 4)} | {row.get('reason', '')} |"
)
lines.append("")
md_path.write_text("\n".join(lines), encoding="utf-8")
return json_path, md_path
def print_plan(plan: ProductCl1BestsellerSplitPlan) -> None:
set_catch_all = sum(1 for row in plan.changes if row.get("action") == "set_catch_all")
clear_catch_all = sum(1 for row in plan.changes if row.get("action") == "clear_catch_all")
print(f"\nPlan: {TASK_NAME}")
print_table(
["Metryka", "Wartosc"],
[
["Dane glowne", f"{plan.metric_window.get('start', '')} - {plan.metric_window.get('end', '')}"],
["Dane swiezosci", f"{plan.recent_window.get('start', '')} - {plan.recent_window.get('end', '')}"],
["Segmenty CL1", str(len(plan.segment_summaries))],
["Zmiany CL4", str(len(plan.changes))],
["Do catch_all", str(set_catch_all)],
["Do glownej kampanii", str(clear_catch_all)],
],
)
if plan.segment_summaries:
print("\nPodsumowanie po CL1")
print_table(
["CL1", "Status", "Produkty", "Rdzen", "Glowna", "Catch_all", "Do catch_all", "Do glownej"],
[
[
row.get("custom_label_1", ""),
row.get("status", ""),
str(row.get("managed_products", 0)),
str(row.get("core_rank_target", 0)),
str(row.get("final_core_products", 0)),
str(row.get("final_catch_all_products", 0)),
str(row.get("planned_set_catch_all", 0)),
str(row.get("planned_clear_catch_all", 0)),
]
for row in plan.segment_summaries
],
)
skipped = [row for row in plan.segment_summaries if row.get("status") == "skipped"]
if skipped:
print("\nPominiete segmenty")
print_table(
["CL1", "Powod"],
[[row.get("custom_label_1", ""), row.get("reason", "")] for row in skipped],
)
if plan.changes:
print("\nNajwazniejsze dzialania")
rows = [
[
str(index),
row.get("custom_label_1", ""),
str(row.get("rank", "")),
row.get("offer_id", ""),
row.get("current_custom_label_4", ""),
row.get("target_custom_label_4", ""),
format_decimal(row.get("score")),
]
for index, row in enumerate(plan.changes[:10], 1)
]
print_table(["Nr", "CL1", "Rank", "Produkt", "Obecne CL4", "Docelowe CL4", "Score"], rows)
if len(plan.changes) > 10:
print(f"... oraz {len(plan.changes) - 10} kolejnych zmian")
if plan.warnings:
print("\nUwagi")
print_table(["Nr", "Uwaga"], [[str(index), warning] for index, warning in enumerate(plan.warnings, 1)])
def apply_plan(client_config: ClientConfig, plan: ProductCl1BestsellerSplitPlan, plan_path: str) -> None:
api_url, api_key, adspro_client_id = adspro_credentials(client_config)
applied = []
errors = []
for row in plan.changes:
try:
set_adspro_cl4(api_url, api_key, adspro_client_id, row["offer_id"], row["target_custom_label_4"])
applied.append(row)
except Exception as exc:
errors.append({**row, "error": str(exc)})
print(f"Blad zapisu {row['offer_id']}: {exc}")
print("\nWynik wdrozenia zmian CL4")
print(f"Wdrozono zmian: {len(applied)}")
print(f"Bledy: {len(errors)}")
change_rows = [
{
"klient": client_config.domain,
"produkt": row["offer_id"],
"CL1": row.get("custom_label_1", ""),
"pole": "custom_label_4",
"obecnie": row.get("current_custom_label_4", ""),
"docelowo": row.get("target_custom_label_4", ""),
}
for row in applied
]
changes_path = append_change_markdown(client_config.domain, TASK_NAME, change_rows)
history_path = append_history(
client_config.domain,
{
"task": TASK_NAME,
"status": "wdrozono zmiany" if not errors else "wdrozono czesciowo",
"product": ", ".join(row["offer_id"] for row in applied[:10]),
"plan_path": plan_path,
"summary": {
"changes": len(plan.changes),
"applied": len(applied),
"errors": len(errors),
"set_catch_all": sum(1 for row in applied if row.get("action") == "set_catch_all"),
"clear_catch_all": sum(1 for row in applied if row.get("action") == "clear_catch_all"),
},
},
)
print(f"Historia JSONL: {history_path}")
print(f"Historia Markdown: {changes_path}")
def print_next_navigation(domain: str) -> None:
print("\nCo dalej:")
print(f"1. Lista zadan klienta {domain}")
print("2. Lista klientow")
print("3. Zakoncz")
print("\nKomendy:")
print(f"1 -> python gads.py analiza-klienta --client {domain}")
print("2 -> python gads.py analiza-klienta")
def run_split_pla_cl1_bestsellers(
client_config: ClientConfig,
global_rules: dict,
plan_only: bool = False,
apply_plan_path: str | None = None,
confirm_apply: str | None = None,
show_navigation: bool = True,
) -> None:
if apply_plan_path:
if confirm_apply != "TAK":
print("Do wdrozenia planu wymagane jest --confirm-apply TAK.")
if show_navigation:
print_next_navigation(client_config.domain)
return
data = json.loads(Path(apply_plan_path).read_text(encoding="utf-8"))
if data.get("client") != client_config.domain:
print(f"Plan jest dla klienta {data.get('client')}, a wybrano {client_config.domain}.")
if show_navigation:
print_next_navigation(client_config.domain)
return
if data.get("task") != TASK_ID:
print(f"Plan jest dla zadania {data.get('task')}, a oczekiwano {TASK_ID}.")
if show_navigation:
print_next_navigation(client_config.domain)
return
plan = ProductCl1BestsellerSplitPlan.from_dict(data)
print_plan(plan)
apply_plan(client_config, plan, apply_plan_path)
if show_navigation:
print_next_navigation(client_config.domain)
return
print(f"\nKlient: {client_config.domain}")
print("Przygotowuje plan podzialu produktow CL1 na bestsellery i catch_all...")
plan = build_product_cl1_bestseller_split_plan(client_config, global_rules)
print_plan(plan)
json_path, md_path = save_plan(client_config.domain, plan)
print(f"\nPlan JSON: {json_path}")
print(f"Plan Markdown: {md_path}")
append_history(
client_config.domain,
{
"task": TASK_NAME,
"status": "plan przygotowany",
"summary": {
"segments": len(plan.segment_summaries),
"changes": len(plan.changes),
"set_catch_all": sum(1 for row in plan.changes if row.get("action") == "set_catch_all"),
"clear_catch_all": sum(1 for row in plan.changes if row.get("action") == "clear_catch_all"),
},
},
)
if plan_only:
print("\nTryb plan-only: zmiany nie zostaly wdrozone.")
else:
print("\nBrak automatycznego wdrozenia. Uzyj zapisanego planu i potwierdzenia, aby wdrozyc zmiany CL4.")
if show_navigation:
print_next_navigation(client_config.domain)