from __future__ import annotations import os from typing import Any from google.ads.googleads.client import GoogleAdsClient def get_google_ads_client(use_proto_plus: bool = True) -> GoogleAdsClient: developer_token = os.environ.get("GOOGLE_ADS_DEVELOPER_TOKEN") or os.environ.get( "GOOGLE_ADS_DEVELOPER_TOKNE" ) if not developer_token: raise RuntimeError("Brak GOOGLE_ADS_DEVELOPER_TOKEN w .env.") return GoogleAdsClient.load_from_dict( { "developer_token": developer_token, "client_id": os.environ["GOOGLE_ADS_OAUTH2_CLIENT_ID"], "client_secret": os.environ["GOOGLE_ADS_OAUTH2_CLIENT_SECRET"], "refresh_token": os.environ["GOOGLE_ADS_OAUTH2_REFRESH_TOKEN"], "login_customer_id": os.environ["GOOGLE_ADS_MANAGER_ACCOUNT_ID"], "use_proto_plus": use_proto_plus, } ) def run_query(client: GoogleAdsClient, customer_id: str, query: str, timeout: float = 300.0) -> list[Any]: service = client.get_service("GoogleAdsService") rows = [] for batch in service.search_stream(customer_id=customer_id, query=query, timeout=timeout): rows.extend(batch.results) return rows