update
This commit is contained in:
398
autoload/services/class.XmlFeedImporter.php
Normal file
398
autoload/services/class.XmlFeedImporter.php
Normal file
@@ -0,0 +1,398 @@
|
||||
<?php
|
||||
namespace services;
|
||||
|
||||
class XmlFeedImporter
|
||||
{
|
||||
const BATCH_SIZE = 200;
|
||||
const HTTP_TIMEOUT = 300;
|
||||
const GMC_NS = 'http://base.google.com/ns/1.0';
|
||||
|
||||
/**
|
||||
* Importuje feed XML (Google Merchant) dla klienta i wzbogaca tabele products.
|
||||
* Strumieniowy parser (XMLReader) odporny na feedy z kilkoma tysiacami pozycji.
|
||||
*
|
||||
* Aktualizuje pola: title, description, custom_label_1, price (zrodlowe dane z feedu).
|
||||
* NIE nadpisuje pol edytowanych przez skrypty/AI: title_gmc, description_gmc (sluza do supplemental feed -> GMC).
|
||||
* Dla pozycji nieobecnej w products tworzy nowy rekord.
|
||||
*
|
||||
* @param int $client_id
|
||||
* @return array raport z polami: feed_url, fetched, updated, inserted, skipped, errors, peak_memory_mb, duration_ms
|
||||
*/
|
||||
static public function import_for_client( $client_id )
|
||||
{
|
||||
global $mdb;
|
||||
|
||||
$client_id = (int) $client_id;
|
||||
$report = [
|
||||
'feed_url' => '',
|
||||
'fetched' => 0,
|
||||
'updated' => 0,
|
||||
'inserted' => 0,
|
||||
'skipped' => 0,
|
||||
'errors' => [],
|
||||
'peak_memory_mb' => 0,
|
||||
'duration_ms' => 0,
|
||||
];
|
||||
|
||||
if ( $client_id <= 0 )
|
||||
{
|
||||
$report['errors'][] = 'Nieprawidlowy client_id';
|
||||
return $report;
|
||||
}
|
||||
|
||||
$client = $mdb -> get( 'clients', [ 'id', 'xml_feed_url' ], [ 'id' => $client_id ] );
|
||||
$feed_url = trim( (string) ( $client['xml_feed_url'] ?? '' ) );
|
||||
if ( $feed_url === '' )
|
||||
{
|
||||
$report['skipped_reason'] = 'no_feed';
|
||||
return $report;
|
||||
}
|
||||
$report['feed_url'] = $feed_url;
|
||||
|
||||
$started_at = microtime( true );
|
||||
@set_time_limit( 600 );
|
||||
@ini_set( 'memory_limit', '512M' );
|
||||
|
||||
$tmp_file = tempnam( sys_get_temp_dir(), 'xmlfeed_' );
|
||||
if ( $tmp_file === false )
|
||||
{
|
||||
$report['errors'][] = 'Nie mozna utworzyc pliku tymczasowego';
|
||||
return $report;
|
||||
}
|
||||
|
||||
$download_ok = self::download_feed( $feed_url, $tmp_file, $report );
|
||||
if ( !$download_ok )
|
||||
{
|
||||
@unlink( $tmp_file );
|
||||
return $report;
|
||||
}
|
||||
|
||||
$reader = new \XMLReader();
|
||||
if ( !$reader -> open( $tmp_file ) )
|
||||
{
|
||||
$report['errors'][] = 'Nie mozna otworzyc feedu XML do parsowania';
|
||||
@unlink( $tmp_file );
|
||||
return $report;
|
||||
}
|
||||
|
||||
$batch = [];
|
||||
while ( $reader -> read() )
|
||||
{
|
||||
if ( $reader -> nodeType !== \XMLReader::ELEMENT )
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
$local = $reader -> localName;
|
||||
if ( $local !== 'item' && $local !== 'entry' )
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
$node = $reader -> expand();
|
||||
if ( !$node )
|
||||
{
|
||||
$report['skipped']++;
|
||||
continue;
|
||||
}
|
||||
|
||||
$item = self::extract_item_fields( $node );
|
||||
if ( $item === null || $item['offer_id'] === '' )
|
||||
{
|
||||
$report['skipped']++;
|
||||
continue;
|
||||
}
|
||||
|
||||
$report['fetched']++;
|
||||
$batch[] = $item;
|
||||
|
||||
if ( count( $batch ) >= self::BATCH_SIZE )
|
||||
{
|
||||
self::flush_batch( $client_id, $batch, $report );
|
||||
$batch = [];
|
||||
gc_collect_cycles();
|
||||
}
|
||||
}
|
||||
catch ( \Throwable $e )
|
||||
{
|
||||
$report['skipped']++;
|
||||
if ( count( $report['errors'] ) < 20 )
|
||||
{
|
||||
$report['errors'][] = 'Item parse error: ' . $e -> getMessage();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if ( !empty( $batch ) )
|
||||
{
|
||||
self::flush_batch( $client_id, $batch, $report );
|
||||
$batch = [];
|
||||
}
|
||||
|
||||
$reader -> close();
|
||||
@unlink( $tmp_file );
|
||||
|
||||
$mdb -> update( 'clients', [ 'xml_feed_last_sync_at' => date( 'Y-m-d H:i:s' ) ], [ 'id' => $client_id ] );
|
||||
|
||||
$report['peak_memory_mb'] = round( memory_get_peak_usage( true ) / 1024 / 1024, 1 );
|
||||
$report['duration_ms'] = (int) round( ( microtime( true ) - $started_at ) * 1000 );
|
||||
|
||||
return $report;
|
||||
}
|
||||
|
||||
static private function download_feed( $url, $tmp_file, &$report )
|
||||
{
|
||||
$fp = fopen( $tmp_file, 'wb' );
|
||||
if ( $fp === false )
|
||||
{
|
||||
$report['errors'][] = 'Nie mozna otworzyc pliku tymczasowego do zapisu';
|
||||
return false;
|
||||
}
|
||||
|
||||
$ch = curl_init( $url );
|
||||
curl_setopt_array( $ch, [
|
||||
CURLOPT_FILE => $fp,
|
||||
CURLOPT_FOLLOWLOCATION => true,
|
||||
CURLOPT_MAXREDIRS => 5,
|
||||
CURLOPT_TIMEOUT => self::HTTP_TIMEOUT,
|
||||
CURLOPT_CONNECTTIMEOUT => 30,
|
||||
CURLOPT_FAILONERROR => true,
|
||||
CURLOPT_USERAGENT => 'adsPRO XML Feed Importer/1.0',
|
||||
CURLOPT_SSL_VERIFYPEER => true,
|
||||
CURLOPT_ENCODING => '',
|
||||
] );
|
||||
|
||||
$ok = curl_exec( $ch );
|
||||
$http_code = (int) curl_getinfo( $ch, CURLINFO_HTTP_CODE );
|
||||
$err = curl_error( $ch );
|
||||
curl_close( $ch );
|
||||
fclose( $fp );
|
||||
|
||||
if ( !$ok || ( $http_code >= 400 ) )
|
||||
{
|
||||
$report['errors'][] = 'Pobieranie feedu nieudane (HTTP ' . $http_code . '): ' . $err;
|
||||
return false;
|
||||
}
|
||||
|
||||
if ( filesize( $tmp_file ) === 0 )
|
||||
{
|
||||
$report['errors'][] = 'Feed jest pusty';
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
static private function extract_item_fields( \DOMNode $node )
|
||||
{
|
||||
$doc = new \DOMDocument();
|
||||
$imported = $doc -> importNode( $node, true );
|
||||
$doc -> appendChild( $imported );
|
||||
|
||||
$sxe = simplexml_import_dom( $doc -> documentElement );
|
||||
if ( $sxe === false )
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
$g = $sxe -> children( self::GMC_NS );
|
||||
|
||||
$offer_id = '';
|
||||
if ( isset( $g -> id ) )
|
||||
{
|
||||
$offer_id = trim( (string) $g -> id );
|
||||
}
|
||||
if ( $offer_id === '' && isset( $sxe -> id ) )
|
||||
{
|
||||
$offer_id = trim( (string) $sxe -> id );
|
||||
}
|
||||
if ( $offer_id === '' )
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
$title = '';
|
||||
if ( isset( $g -> title ) )
|
||||
{
|
||||
$title = trim( (string) $g -> title );
|
||||
}
|
||||
if ( $title === '' && isset( $sxe -> title ) )
|
||||
{
|
||||
$title = trim( (string) $sxe -> title );
|
||||
}
|
||||
|
||||
$description = '';
|
||||
if ( isset( $g -> description ) )
|
||||
{
|
||||
$description = trim( (string) $g -> description );
|
||||
}
|
||||
if ( $description === '' && isset( $sxe -> description ) )
|
||||
{
|
||||
$description = trim( (string) $sxe -> description );
|
||||
}
|
||||
|
||||
$custom_label_1 = isset( $g -> custom_label_1 ) ? trim( (string) $g -> custom_label_1 ) : '';
|
||||
|
||||
$price = null;
|
||||
if ( isset( $g -> price ) )
|
||||
{
|
||||
$price_raw = trim( (string) $g -> price );
|
||||
$price = self::parse_price( $price_raw );
|
||||
}
|
||||
if ( $price === null && isset( $g -> sale_price ) )
|
||||
{
|
||||
$price = self::parse_price( trim( (string) $g -> sale_price ) );
|
||||
}
|
||||
|
||||
return [
|
||||
'offer_id' => self::truncate( $offer_id, 255 ),
|
||||
'title' => self::truncate( $title, 255 ),
|
||||
'description' => $description,
|
||||
'custom_label_1' => self::truncate( $custom_label_1, 255 ),
|
||||
'price' => $price,
|
||||
];
|
||||
}
|
||||
|
||||
static private function parse_price( $raw )
|
||||
{
|
||||
if ( $raw === '' )
|
||||
{
|
||||
return null;
|
||||
}
|
||||
if ( preg_match( '/([0-9]+(?:[.,][0-9]+)?)/', $raw, $m ) )
|
||||
{
|
||||
$value = (float) str_replace( ',', '.', $m[1] );
|
||||
if ( $value > 0 )
|
||||
{
|
||||
return round( $value, 2 );
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
static private function truncate( $value, $max )
|
||||
{
|
||||
if ( function_exists( 'mb_substr' ) )
|
||||
{
|
||||
return mb_substr( (string) $value, 0, $max, 'UTF-8' );
|
||||
}
|
||||
return substr( (string) $value, 0, $max );
|
||||
}
|
||||
|
||||
static private function flush_batch( $client_id, array $batch, array &$report )
|
||||
{
|
||||
global $mdb;
|
||||
|
||||
if ( empty( $batch ) )
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
$pdo = $mdb -> pdo;
|
||||
|
||||
try
|
||||
{
|
||||
$pdo -> beginTransaction();
|
||||
|
||||
$offer_ids = [];
|
||||
foreach ( $batch as $item )
|
||||
{
|
||||
$offer_ids[ $item['offer_id'] ] = true;
|
||||
}
|
||||
$offer_ids = array_keys( $offer_ids );
|
||||
|
||||
$existing = [];
|
||||
if ( !empty( $offer_ids ) )
|
||||
{
|
||||
$placeholders = [];
|
||||
$select_params = [ ':client_id' => $client_id ];
|
||||
foreach ( $offer_ids as $idx => $oid )
|
||||
{
|
||||
$key = ':oid_' . $idx;
|
||||
$placeholders[] = $key;
|
||||
$select_params[ $key ] = $oid;
|
||||
}
|
||||
$sel_sql = 'SELECT id, offer_id FROM products WHERE client_id = :client_id AND offer_id IN (' . implode( ', ', $placeholders ) . ')';
|
||||
$sel_stmt = $pdo -> prepare( $sel_sql );
|
||||
$sel_stmt -> execute( $select_params );
|
||||
while ( $row = $sel_stmt -> fetch( \PDO::FETCH_ASSOC ) )
|
||||
{
|
||||
// jeden offer_id moze miec wiele wierszy (legacy duplikaty) - zbieramy wszystkie id
|
||||
$existing[ (string) $row['offer_id'] ][] = (int) $row['id'];
|
||||
}
|
||||
}
|
||||
|
||||
$update_stmt = $pdo -> prepare(
|
||||
'UPDATE products SET
|
||||
title = :title,
|
||||
description = :description,
|
||||
custom_label_1 = COALESCE(:custom_label_1, custom_label_1),
|
||||
price = COALESCE(:price, price)
|
||||
WHERE id = :id'
|
||||
);
|
||||
|
||||
$insert_stmt = $pdo -> prepare(
|
||||
'INSERT INTO products (client_id, offer_id, title, description, custom_label_1, price)
|
||||
VALUES (:client_id, :offer_id, :title, :description, :custom_label_1, :price)'
|
||||
);
|
||||
|
||||
$updated_count = 0;
|
||||
$inserted_count = 0;
|
||||
|
||||
foreach ( $batch as $item )
|
||||
{
|
||||
$title = $item['title'] !== '' ? $item['title'] : null;
|
||||
$desc = $item['description'] !== '' ? $item['description'] : null;
|
||||
$cl1 = $item['custom_label_1'] !== '' ? $item['custom_label_1'] : null;
|
||||
$price = $item['price'];
|
||||
|
||||
if ( !empty( $existing[ $item['offer_id'] ] ) )
|
||||
{
|
||||
// aktualizujemy WSZYSTKIE legacy duplikaty (utrzymujemy spojnosc danych)
|
||||
foreach ( $existing[ $item['offer_id'] ] as $row_id )
|
||||
{
|
||||
$update_stmt -> execute( [
|
||||
':title' => $title,
|
||||
':description' => $desc,
|
||||
':custom_label_1' => $cl1,
|
||||
':price' => $price,
|
||||
':id' => $row_id,
|
||||
] );
|
||||
$updated_count++;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
$insert_stmt -> execute( [
|
||||
':client_id' => $client_id,
|
||||
':offer_id' => $item['offer_id'],
|
||||
':title' => $title,
|
||||
':description' => $desc,
|
||||
':custom_label_1' => $cl1,
|
||||
':price' => $price,
|
||||
] );
|
||||
$inserted_count++;
|
||||
}
|
||||
}
|
||||
|
||||
$pdo -> commit();
|
||||
|
||||
$report['updated'] += $updated_count;
|
||||
$report['inserted'] += $inserted_count;
|
||||
}
|
||||
catch ( \Throwable $e )
|
||||
{
|
||||
if ( $pdo -> inTransaction() )
|
||||
{
|
||||
$pdo -> rollBack();
|
||||
}
|
||||
if ( count( $report['errors'] ) < 20 )
|
||||
{
|
||||
$report['errors'][] = 'Batch flush error: ' . $e -> getMessage();
|
||||
}
|
||||
$report['skipped'] += count( $batch );
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user