Files
google-ads-ver-2/src/gads_v2/google_ads.py
2026-05-15 09:28:11 +02:00

34 lines
1.2 KiB
Python

from __future__ import annotations
import os
from typing import Any
from google.ads.googleads.client import GoogleAdsClient
def get_google_ads_client(use_proto_plus: bool = True) -> GoogleAdsClient:
developer_token = os.environ.get("GOOGLE_ADS_DEVELOPER_TOKEN") or os.environ.get(
"GOOGLE_ADS_DEVELOPER_TOKNE"
)
if not developer_token:
raise RuntimeError("Brak GOOGLE_ADS_DEVELOPER_TOKEN w .env.")
return GoogleAdsClient.load_from_dict(
{
"developer_token": developer_token,
"client_id": os.environ["GOOGLE_ADS_OAUTH2_CLIENT_ID"],
"client_secret": os.environ["GOOGLE_ADS_OAUTH2_CLIENT_SECRET"],
"refresh_token": os.environ["GOOGLE_ADS_OAUTH2_REFRESH_TOKEN"],
"login_customer_id": os.environ["GOOGLE_ADS_MANAGER_ACCOUNT_ID"],
"use_proto_plus": use_proto_plus,
}
)
def run_query(client: GoogleAdsClient, customer_id: str, query: str, timeout: float = 300.0) -> list[Any]:
service = client.get_service("GoogleAdsService")
rows = []
for batch in service.search_stream(customer_id=customer_id, query=query, timeout=timeout):
rows.extend(batch.results)
return rows