feat(cronjob): implement CronJobProcessor and CronJobRepository for job scheduling and processing
- Added CronJobProcessor class to handle job creation and queue processing. - Implemented CronJobRepository for database interactions related to cron jobs. - Introduced CronJobType class to define job types, priorities, and statuses. - Created ApiloLogger for logging actions related to job processing. - Initialized apilo-sync-queue.json for job queue management.
This commit is contained in:
@@ -7,17 +7,21 @@ class OrderAdminService
|
||||
private $productRepo;
|
||||
private $settingsRepo;
|
||||
private $transportRepo;
|
||||
/** @var \Domain\CronJob\CronJobRepository|null */
|
||||
private $cronJobRepo;
|
||||
|
||||
public function __construct(
|
||||
OrderRepository $orders,
|
||||
$productRepo = null,
|
||||
$settingsRepo = null,
|
||||
$transportRepo = null
|
||||
$transportRepo = null,
|
||||
$cronJobRepo = null
|
||||
) {
|
||||
$this->orders = $orders;
|
||||
$this->productRepo = $productRepo;
|
||||
$this->settingsRepo = $settingsRepo;
|
||||
$this->transportRepo = $transportRepo;
|
||||
$this->cronJobRepo = $cronJobRepo;
|
||||
}
|
||||
|
||||
public function details(int $orderId): array
|
||||
@@ -30,6 +34,14 @@ class OrderAdminService
|
||||
return $this->orders->orderStatuses();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array{names: array<int, string>, colors: array<int, string>}
|
||||
*/
|
||||
public function statusData(): array
|
||||
{
|
||||
return $this->orders->orderStatusData();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array{items: array<int, array<string, mixed>>, total: int}
|
||||
*/
|
||||
@@ -385,17 +397,38 @@ class OrderAdminService
|
||||
global $mdb;
|
||||
|
||||
if ($orderId <= 0) {
|
||||
\Domain\Integrations\ApiloLogger::log(
|
||||
$mdb,
|
||||
'resend_order',
|
||||
$orderId,
|
||||
'Nieprawidlowe ID zamowienia (orderId <= 0)',
|
||||
['order_id' => $orderId]
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
$order = $this->orders->findForAdmin($orderId);
|
||||
if (empty($order) || empty($order['apilo_order_id'])) {
|
||||
\Domain\Integrations\ApiloLogger::log(
|
||||
$mdb,
|
||||
'resend_order',
|
||||
$orderId,
|
||||
'Brak zamowienia lub brak apilo_order_id',
|
||||
['order_found' => !empty($order), 'apilo_order_id' => $order['apilo_order_id'] ?? null]
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
$integrationsRepository = new \Domain\Integrations\IntegrationsRepository( $mdb );
|
||||
$accessToken = $integrationsRepository -> apiloGetAccessToken();
|
||||
if (!$accessToken) {
|
||||
\Domain\Integrations\ApiloLogger::log(
|
||||
$mdb,
|
||||
'resend_order',
|
||||
$orderId,
|
||||
'Nie udalo sie uzyskac tokenu Apilo (access token)',
|
||||
['apilo_order_id' => $order['apilo_order_id']]
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -417,13 +450,29 @@ class OrderAdminService
|
||||
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
|
||||
|
||||
$apiloResultRaw = curl_exec($ch);
|
||||
$http_code = (int)curl_getinfo($ch, CURLINFO_HTTP_CODE);
|
||||
$apiloResult = json_decode((string)$apiloResultRaw, true);
|
||||
|
||||
if (!is_array($apiloResult) || (int)($apiloResult['updates'] ?? 0) !== 1) {
|
||||
\Domain\Integrations\ApiloLogger::log(
|
||||
$mdb,
|
||||
'resend_order',
|
||||
$orderId,
|
||||
'Błąd ponownego wysyłania zamówienia do Apilo (HTTP: ' . $http_code . ')',
|
||||
['apilo_order_id' => $order['apilo_order_id'], 'http_code' => $http_code, 'response' => $apiloResult]
|
||||
);
|
||||
curl_close($ch);
|
||||
return false;
|
||||
}
|
||||
|
||||
\Domain\Integrations\ApiloLogger::log(
|
||||
$mdb,
|
||||
'resend_order',
|
||||
$orderId,
|
||||
'Zamówienie ponownie wysłane do Apilo (apilo_order_id: ' . $order['apilo_order_id'] . ')',
|
||||
['apilo_order_id' => $order['apilo_order_id'], 'http_code' => $http_code, 'response' => $apiloResult]
|
||||
);
|
||||
|
||||
$query = "SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'pp_shop_orders' AND COLUMN_NAME != 'id'";
|
||||
$stmt = $mdb->query($query);
|
||||
$columns = $stmt ? $stmt->fetchAll(\PDO::FETCH_COLUMN) : [];
|
||||
@@ -474,75 +523,6 @@ class OrderAdminService
|
||||
return $this->orders->deleteOrder($orderId);
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Apilo sync queue (migrated from \shop\Order)
|
||||
// =========================================================================
|
||||
|
||||
private const APILO_SYNC_QUEUE_FILE = '/temp/apilo-sync-queue.json';
|
||||
|
||||
public function processApiloSyncQueue(int $limit = 10): int
|
||||
{
|
||||
$queue = self::loadApiloSyncQueue();
|
||||
if (!\Shared\Helpers\Helpers::is_array_fix($queue)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
$processed = 0;
|
||||
|
||||
foreach ($queue as $key => $task)
|
||||
{
|
||||
if ($processed >= $limit) {
|
||||
break;
|
||||
}
|
||||
|
||||
$order_id = (int)($task['order_id'] ?? 0);
|
||||
if ($order_id <= 0) {
|
||||
unset($queue[$key]);
|
||||
continue;
|
||||
}
|
||||
|
||||
$order = $this->orders->findRawById($order_id);
|
||||
if (!$order) {
|
||||
unset($queue[$key]);
|
||||
continue;
|
||||
}
|
||||
|
||||
$error = '';
|
||||
$sync_failed = false;
|
||||
|
||||
$payment_pending = !empty($task['payment']) && (int)$order['paid'] === 1;
|
||||
if ($payment_pending && (int)$order['apilo_order_id']) {
|
||||
if (!$this->syncApiloPayment($order)) {
|
||||
$sync_failed = true;
|
||||
$error = 'payment_sync_failed';
|
||||
}
|
||||
}
|
||||
|
||||
$status_pending = isset($task['status']) && $task['status'] !== null && $task['status'] !== '';
|
||||
if (!$sync_failed && $status_pending && (int)$order['apilo_order_id']) {
|
||||
if (!$this->syncApiloStatus($order, (int)$task['status'])) {
|
||||
$sync_failed = true;
|
||||
$error = 'status_sync_failed';
|
||||
}
|
||||
}
|
||||
|
||||
if ($sync_failed) {
|
||||
$task['attempts'] = (int)($task['attempts'] ?? 0) + 1;
|
||||
$task['last_error'] = $error;
|
||||
$task['updated_at'] = date('Y-m-d H:i:s');
|
||||
$queue[$key] = $task;
|
||||
} else {
|
||||
unset($queue[$key]);
|
||||
}
|
||||
|
||||
$processed++;
|
||||
}
|
||||
|
||||
self::saveApiloSyncQueue($queue);
|
||||
|
||||
return $processed;
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Private: email
|
||||
// =========================================================================
|
||||
@@ -600,6 +580,17 @@ class OrderAdminService
|
||||
$apilo_settings = $integrationsRepository->getSettings('apilo');
|
||||
|
||||
if (!$apilo_settings['enabled'] || !$apilo_settings['access-token'] || !$apilo_settings['sync_orders']) {
|
||||
\Domain\Integrations\ApiloLogger::log(
|
||||
$db,
|
||||
'payment_sync',
|
||||
(int)$order['id'],
|
||||
'Pominięto sync płatności — Apilo wyłączone lub brak tokenu/sync_orders',
|
||||
[
|
||||
'enabled' => $apilo_settings['enabled'] ?? false,
|
||||
'has_token' => !empty($apilo_settings['access-token']),
|
||||
'sync_orders' => $apilo_settings['sync_orders'] ?? false,
|
||||
]
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -607,8 +598,25 @@ class OrderAdminService
|
||||
self::appendApiloLog("SET AS PAID\n" . print_r($order, true));
|
||||
}
|
||||
|
||||
if ($order['apilo_order_id'] && !$this->syncApiloPayment($order)) {
|
||||
self::queueApiloSync((int)$order['id'], true, null, 'payment_sync_failed');
|
||||
if (!$order['apilo_order_id']) {
|
||||
// Zamówienie jeszcze nie wysłane do Apilo — kolejkuj sync płatności na później
|
||||
\Domain\Integrations\ApiloLogger::log(
|
||||
$db,
|
||||
'payment_sync',
|
||||
(int)$order['id'],
|
||||
'Brak apilo_order_id — płatność zakolejkowana do sync',
|
||||
['apilo_order_id' => $order['apilo_order_id'] ?? null]
|
||||
);
|
||||
$this->queueApiloSync((int)$order['id'], true, null, 'awaiting_apilo_order');
|
||||
} elseif (!$this->syncApiloPayment($order)) {
|
||||
\Domain\Integrations\ApiloLogger::log(
|
||||
$db,
|
||||
'payment_sync',
|
||||
(int)$order['id'],
|
||||
'Sync płatności nieudany — zakolejkowano ponowną próbę',
|
||||
['apilo_order_id' => $order['apilo_order_id']]
|
||||
);
|
||||
$this->queueApiloSync((int)$order['id'], true, null, 'payment_sync_failed');
|
||||
}
|
||||
}
|
||||
|
||||
@@ -621,6 +629,18 @@ class OrderAdminService
|
||||
$apilo_settings = $integrationsRepository->getSettings('apilo');
|
||||
|
||||
if (!$apilo_settings['enabled'] || !$apilo_settings['access-token'] || !$apilo_settings['sync_orders']) {
|
||||
\Domain\Integrations\ApiloLogger::log(
|
||||
$db,
|
||||
'status_sync',
|
||||
(int)$order['id'],
|
||||
'Pominięto sync statusu — Apilo wyłączone lub brak tokenu/sync_orders',
|
||||
[
|
||||
'target_status' => $status,
|
||||
'enabled' => $apilo_settings['enabled'] ?? false,
|
||||
'has_token' => !empty($apilo_settings['access-token']),
|
||||
'sync_orders' => $apilo_settings['sync_orders'] ?? false,
|
||||
]
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -628,19 +648,36 @@ class OrderAdminService
|
||||
self::appendApiloLog("UPDATE STATUS\n" . print_r($order, true));
|
||||
}
|
||||
|
||||
if ($order['apilo_order_id'] && !$this->syncApiloStatus($order, $status)) {
|
||||
self::queueApiloSync((int)$order['id'], false, $status, 'status_sync_failed');
|
||||
if (!$order['apilo_order_id']) {
|
||||
// Zamówienie jeszcze nie wysłane do Apilo — kolejkuj sync statusu na później
|
||||
\Domain\Integrations\ApiloLogger::log(
|
||||
$db,
|
||||
'status_sync',
|
||||
(int)$order['id'],
|
||||
'Brak apilo_order_id — status zakolejkowany do sync',
|
||||
['apilo_order_id' => $order['apilo_order_id'] ?? null, 'target_status' => $status]
|
||||
);
|
||||
$this->queueApiloSync((int)$order['id'], false, $status, 'awaiting_apilo_order');
|
||||
} elseif (!$this->syncApiloStatus($order, $status)) {
|
||||
\Domain\Integrations\ApiloLogger::log(
|
||||
$db,
|
||||
'status_sync',
|
||||
(int)$order['id'],
|
||||
'Sync statusu nieudany — zakolejkowano ponowną próbę',
|
||||
['apilo_order_id' => $order['apilo_order_id'], 'target_status' => $status]
|
||||
);
|
||||
$this->queueApiloSync((int)$order['id'], false, $status, 'status_sync_failed');
|
||||
}
|
||||
}
|
||||
|
||||
private function syncApiloPayment(array $order): bool
|
||||
public function syncApiloPayment(array $order): bool
|
||||
{
|
||||
global $config;
|
||||
|
||||
$db = $this->orders->getDb();
|
||||
$integrationsRepository = new \Domain\Integrations\IntegrationsRepository($db);
|
||||
|
||||
if (!(int)$order['apilo_order_id']) {
|
||||
if (empty($order['apilo_order_id'])) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -677,20 +714,37 @@ class OrderAdminService
|
||||
self::appendApiloLog("PAYMENT RESPONSE\nHTTP: " . $http_code . "\nCURL: " . $curl_error . "\n" . print_r($apilo_response, true));
|
||||
}
|
||||
|
||||
$success = ($curl_error === '' && $http_code >= 200 && $http_code < 300);
|
||||
|
||||
\Domain\Integrations\ApiloLogger::log(
|
||||
$db,
|
||||
'payment_sync',
|
||||
(int)$order['id'],
|
||||
$success
|
||||
? 'Płatność zsynchronizowana z Apilo (apilo_order_id: ' . $order['apilo_order_id'] . ')'
|
||||
: 'Błąd synchronizacji płatności (HTTP: ' . $http_code . ($curl_error ? ', cURL: ' . $curl_error : '') . ')',
|
||||
[
|
||||
'apilo_order_id' => $order['apilo_order_id'],
|
||||
'http_code' => $http_code,
|
||||
'curl_error' => $curl_error,
|
||||
'response' => json_decode((string)$apilo_response, true),
|
||||
]
|
||||
);
|
||||
|
||||
if ($curl_error !== '') return false;
|
||||
if ($http_code < 200 || $http_code >= 300) return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private function syncApiloStatus(array $order, int $status): bool
|
||||
public function syncApiloStatus(array $order, int $status): bool
|
||||
{
|
||||
global $config;
|
||||
|
||||
$db = $this->orders->getDb();
|
||||
$integrationsRepository = new \Domain\Integrations\IntegrationsRepository($db);
|
||||
|
||||
if (!(int)$order['apilo_order_id']) {
|
||||
if (empty($order['apilo_order_id'])) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -721,6 +775,24 @@ class OrderAdminService
|
||||
self::appendApiloLog("STATUS RESPONSE\nHTTP: " . $http_code . "\nCURL: " . $curl_error . "\n" . print_r($apilo_result, true));
|
||||
}
|
||||
|
||||
$success = ($curl_error === '' && $http_code >= 200 && $http_code < 300);
|
||||
|
||||
\Domain\Integrations\ApiloLogger::log(
|
||||
$db,
|
||||
'status_sync',
|
||||
(int)$order['id'],
|
||||
$success
|
||||
? 'Status zsynchronizowany z Apilo (apilo_order_id: ' . $order['apilo_order_id'] . ', status: ' . $status . ')'
|
||||
: 'Błąd synchronizacji statusu (HTTP: ' . $http_code . ($curl_error ? ', cURL: ' . $curl_error : '') . ')',
|
||||
[
|
||||
'apilo_order_id' => $order['apilo_order_id'],
|
||||
'status' => $status,
|
||||
'http_code' => $http_code,
|
||||
'curl_error' => $curl_error,
|
||||
'response' => json_decode((string)$apilo_result, true),
|
||||
]
|
||||
);
|
||||
|
||||
if ($curl_error !== '') return false;
|
||||
if ($http_code < 200 || $http_code >= 300) return false;
|
||||
|
||||
@@ -728,59 +800,42 @@ class OrderAdminService
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Private: Apilo sync queue file helpers
|
||||
// Private: Apilo sync queue (DB-based via CronJobRepository)
|
||||
// =========================================================================
|
||||
|
||||
private static function queueApiloSync(int $order_id, bool $payment, ?int $status, string $error): void
|
||||
private function queueApiloSync(int $order_id, bool $payment, ?int $status, string $error): void
|
||||
{
|
||||
if ($order_id <= 0) return;
|
||||
|
||||
$queue = self::loadApiloSyncQueue();
|
||||
$key = (string)$order_id;
|
||||
$row = is_array($queue[$key] ?? null) ? $queue[$key] : [];
|
||||
if ($this->cronJobRepo === null) return;
|
||||
|
||||
if ($payment) {
|
||||
$jobType = \Domain\CronJob\CronJobType::APILO_SYNC_PAYMENT;
|
||||
$payload = ['order_id' => $order_id];
|
||||
|
||||
if (!$this->cronJobRepo->hasPendingJob($jobType, $payload)) {
|
||||
$this->cronJobRepo->enqueue(
|
||||
$jobType,
|
||||
$payload,
|
||||
\Domain\CronJob\CronJobType::PRIORITY_HIGH,
|
||||
50
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
$row['order_id'] = $order_id;
|
||||
$row['payment'] = !empty($row['payment']) || $payment ? 1 : 0;
|
||||
if ($status !== null) {
|
||||
$row['status'] = $status;
|
||||
$jobType = \Domain\CronJob\CronJobType::APILO_SYNC_STATUS;
|
||||
$payload = ['order_id' => $order_id, 'status' => $status];
|
||||
|
||||
if (!$this->cronJobRepo->hasPendingJob($jobType, $payload)) {
|
||||
$this->cronJobRepo->enqueue(
|
||||
$jobType,
|
||||
$payload,
|
||||
\Domain\CronJob\CronJobType::PRIORITY_HIGH,
|
||||
50
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
$row['attempts'] = (int)($row['attempts'] ?? 0) + 1;
|
||||
$row['last_error'] = $error;
|
||||
$row['updated_at'] = date('Y-m-d H:i:s');
|
||||
|
||||
$queue[$key] = $row;
|
||||
self::saveApiloSyncQueue($queue);
|
||||
}
|
||||
|
||||
private static function apiloSyncQueuePath(): string
|
||||
{
|
||||
return dirname(__DIR__, 2) . self::APILO_SYNC_QUEUE_FILE;
|
||||
}
|
||||
|
||||
private static function loadApiloSyncQueue(): array
|
||||
{
|
||||
$path = self::apiloSyncQueuePath();
|
||||
if (!file_exists($path)) return [];
|
||||
|
||||
$content = file_get_contents($path);
|
||||
if (!$content) return [];
|
||||
|
||||
$decoded = json_decode($content, true);
|
||||
if (!is_array($decoded)) return [];
|
||||
|
||||
return $decoded;
|
||||
}
|
||||
|
||||
private static function saveApiloSyncQueue(array $queue): void
|
||||
{
|
||||
$path = self::apiloSyncQueuePath();
|
||||
$dir = dirname($path);
|
||||
if (!is_dir($dir)) {
|
||||
mkdir($dir, 0777, true);
|
||||
}
|
||||
|
||||
file_put_contents($path, json_encode($queue, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES), LOCK_EX);
|
||||
}
|
||||
|
||||
private static function appendApiloLog(string $message): void
|
||||
|
||||
@@ -245,25 +245,43 @@ class OrderRepository
|
||||
|
||||
public function orderStatuses(): array
|
||||
{
|
||||
$rows = $this->db->select('pp_shop_statuses', ['id', 'status'], [
|
||||
$data = $this->orderStatusData();
|
||||
return $data['names'];
|
||||
}
|
||||
|
||||
/**
|
||||
* Zwraca nazwy i kolory statusów w jednym zapytaniu.
|
||||
*
|
||||
* @return array{names: array<int, string>, colors: array<int, string>}
|
||||
*/
|
||||
public function orderStatusData(): array
|
||||
{
|
||||
$rows = $this->db->select('pp_shop_statuses', ['id', 'status', 'color'], [
|
||||
'ORDER' => ['o' => 'ASC'],
|
||||
]);
|
||||
|
||||
$names = [];
|
||||
$colors = [];
|
||||
|
||||
if (!is_array($rows)) {
|
||||
return [];
|
||||
return ['names' => $names, 'colors' => $colors];
|
||||
}
|
||||
|
||||
$result = [];
|
||||
foreach ($rows as $row) {
|
||||
$id = (int)($row['id'] ?? 0);
|
||||
if ($id < 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$result[$id] = (string)($row['status'] ?? '');
|
||||
$names[$id] = (string)($row['status'] ?? '');
|
||||
|
||||
$color = trim((string)($row['color'] ?? ''));
|
||||
if ($color !== '' && preg_match('/^#[0-9a-fA-F]{3,6}$/', $color)) {
|
||||
$colors[$id] = $color;
|
||||
}
|
||||
}
|
||||
|
||||
return $result;
|
||||
return ['names' => $names, 'colors' => $colors];
|
||||
}
|
||||
|
||||
public function nextOrderId(int $orderId): ?int
|
||||
|
||||
Reference in New Issue
Block a user