feat: database-backed cron job queue replacing JSON file system

Replace file-based JSON cron queue with DB-backed job queue (pp_cron_jobs,
pp_cron_schedules). New Domain\CronJob module: CronJobType (constants),
CronJobRepository (CRUD, atomic fetch, retry/backoff), CronJobProcessor
(orchestration with handler registration). Priority ordering guarantees
apilo_send_order (40) runs before sync tasks (50). Includes cron.php auth
protection, race condition fix in fetchNext, API response validation,
and DI wiring across all entry points. 41 new tests.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-27 13:29:11 +01:00
parent 97d7473753
commit 52119a0724
19 changed files with 1723 additions and 417 deletions

View File

@@ -0,0 +1,140 @@
<?php
namespace Domain\CronJob;
class CronJobProcessor
{
/** @var CronJobRepository */
private $cronRepo;
/** @var array<string, callable> */
private $handlers = [];
/**
* @param CronJobRepository $cronRepo
*/
public function __construct(CronJobRepository $cronRepo)
{
$this->cronRepo = $cronRepo;
}
/**
* Zarejestruj handler dla typu zadania
*
* @param string $jobType
* @param callable $handler fn($payload): bool|array — true/array = success, false/exception = fail
*/
public function registerHandler($jobType, callable $handler)
{
$this->handlers[$jobType] = $handler;
}
/**
* Utwórz zadania z harmonogramów, których next_run_at <= NOW
*
* @return int Liczba utworzonych zadań
*/
public function createScheduledJobs()
{
$schedules = $this->cronRepo->getDueSchedules();
$created = 0;
foreach ($schedules as $schedule) {
$jobType = $schedule['job_type'];
// Nie twórz duplikatów
if ($this->cronRepo->hasPendingJob($jobType)) {
// Mimo duplikatu, przesuń next_run_at żeby nie sprawdzać co sekundę
$this->cronRepo->touchSchedule($schedule['id'], (int) $schedule['interval_seconds']);
continue;
}
$payload = null;
if (!empty($schedule['payload'])) {
$payload = json_decode($schedule['payload'], true);
}
$this->cronRepo->enqueue(
$jobType,
$payload,
(int) $schedule['priority'],
(int) $schedule['max_attempts']
);
$this->cronRepo->touchSchedule($schedule['id'], (int) $schedule['interval_seconds']);
$created++;
}
return $created;
}
/**
* Przetwórz kolejkę zadań
*
* @param int $limit
* @return array Statystyki: ['processed' => int, 'succeeded' => int, 'failed' => int, 'skipped' => int]
*/
public function processQueue($limit = 10)
{
$stats = ['processed' => 0, 'succeeded' => 0, 'failed' => 0, 'skipped' => 0];
$jobs = $this->cronRepo->fetchNext($limit);
foreach ($jobs as $job) {
$jobType = $job['job_type'];
$jobId = (int) $job['id'];
$stats['processed']++;
if (!isset($this->handlers[$jobType])) {
$this->cronRepo->markFailed($jobId, 'No handler registered for job type: ' . $jobType, (int) $job['attempts']);
$stats['skipped']++;
continue;
}
try {
$result = call_user_func($this->handlers[$jobType], $job['payload']);
if ($result === false) {
$this->cronRepo->markFailed($jobId, 'Handler returned false', (int) $job['attempts']);
$stats['failed']++;
} else {
$resultData = is_array($result) ? $result : null;
$this->cronRepo->markCompleted($jobId, $resultData);
$stats['succeeded']++;
}
} catch (\Exception $e) {
$this->cronRepo->markFailed($jobId, $e->getMessage(), (int) $job['attempts']);
$stats['failed']++;
} catch (\Throwable $e) {
$this->cronRepo->markFailed($jobId, $e->getMessage(), (int) $job['attempts']);
$stats['failed']++;
}
}
return $stats;
}
/**
* Główna metoda: utwórz scheduled jobs + przetwórz kolejkę
*
* @param int $limit
* @return array ['scheduled' => int, 'processed' => int, 'succeeded' => int, 'failed' => int, 'skipped' => int]
*/
public function run($limit = 20)
{
// Odzyskaj stuck jobs
$this->cronRepo->recoverStuck(30);
// Utwórz zadania z harmonogramów
$scheduled = $this->createScheduledJobs();
// Przetwórz kolejkę
$stats = $this->processQueue($limit);
$stats['scheduled'] = $scheduled;
// Cleanup starych zadań (raz na uruchomienie)
$this->cronRepo->cleanup(30);
return $stats;
}
}

View File

@@ -0,0 +1,248 @@
<?php
namespace Domain\CronJob;
class CronJobRepository
{
/** @var \medoo */
private $db;
/**
* @param \medoo $db
*/
public function __construct($db)
{
$this->db = $db;
}
/**
* Dodaj zadanie do kolejki
*
* @param string $jobType
* @param array|null $payload
* @param int $priority
* @param int $maxAttempts
* @param string|null $scheduledAt
* @return int|null ID nowego zadania
*/
public function enqueue($jobType, $payload = null, $priority = CronJobType::PRIORITY_NORMAL, $maxAttempts = 10, $scheduledAt = null)
{
$data = [
'job_type' => $jobType,
'status' => CronJobType::STATUS_PENDING,
'priority' => $priority,
'max_attempts' => $maxAttempts,
'scheduled_at' => $scheduledAt ? $scheduledAt : date('Y-m-d H:i:s'),
];
if ($payload !== null) {
$data['payload'] = json_encode($payload);
}
$this->db->insert('pp_cron_jobs', $data);
$id = $this->db->id();
return $id ? (int) $id : null;
}
/**
* Atomowe pobranie następnych zadań do przetworzenia.
*
* Uwaga: SELECT + UPDATE nie jest w pełni atomowe bez transakcji.
* Po UPDATE re-SELECT potwierdza, które joby zostały faktycznie przejęte
* (chroni przed race condition przy wielu workerach).
*
* @param int $limit
* @return array
*/
public function fetchNext($limit = 5)
{
$now = date('Y-m-d H:i:s');
$jobs = $this->db->select('pp_cron_jobs', '*', [
'status' => CronJobType::STATUS_PENDING,
'scheduled_at[<=]' => $now,
'ORDER' => ['priority' => 'ASC', 'scheduled_at' => 'ASC'],
'LIMIT' => $limit,
]);
if (empty($jobs)) {
return [];
}
$ids = array_column($jobs, 'id');
$this->db->update('pp_cron_jobs', [
'status' => CronJobType::STATUS_PROCESSING,
'started_at' => $now,
'attempts[+]' => 1,
], [
'id' => $ids,
'status' => CronJobType::STATUS_PENDING,
]);
// Re-SELECT: potwierdź, które joby zostały faktycznie przejęte
$claimed = $this->db->select('pp_cron_jobs', '*', [
'id' => $ids,
'status' => CronJobType::STATUS_PROCESSING,
'started_at' => $now,
]);
if (empty($claimed)) {
return [];
}
foreach ($claimed as &$job) {
if ($job['payload'] !== null) {
$job['payload'] = json_decode($job['payload'], true);
}
}
return $claimed;
}
/**
* Oznacz zadanie jako zakończone
*
* @param int $jobId
* @param mixed $result
*/
public function markCompleted($jobId, $result = null)
{
$data = [
'status' => CronJobType::STATUS_COMPLETED,
'completed_at' => date('Y-m-d H:i:s'),
];
if ($result !== null) {
$data['result'] = json_encode($result);
}
$this->db->update('pp_cron_jobs', $data, ['id' => $jobId]);
}
/**
* Oznacz zadanie jako nieudane z backoffem
*
* @param int $jobId
* @param string $error
* @param int $attempt Numer próby (do obliczenia backoffu)
*/
public function markFailed($jobId, $error, $attempt = 1)
{
$job = $this->db->get('pp_cron_jobs', ['max_attempts', 'attempts'], ['id' => $jobId]);
$attempts = $job ? (int) $job['attempts'] : $attempt;
$maxAttempts = $job ? (int) $job['max_attempts'] : 10;
if ($attempts >= $maxAttempts) {
// Przekroczono limit prób — trwale failed
$this->db->update('pp_cron_jobs', [
'status' => CronJobType::STATUS_FAILED,
'last_error' => mb_substr($error, 0, 500),
'completed_at' => date('Y-m-d H:i:s'),
], ['id' => $jobId]);
} else {
// Wróć do pending z backoffem
$backoff = CronJobType::calculateBackoff($attempts);
$nextRun = date('Y-m-d H:i:s', time() + $backoff);
$this->db->update('pp_cron_jobs', [
'status' => CronJobType::STATUS_PENDING,
'last_error' => mb_substr($error, 0, 500),
'scheduled_at' => $nextRun,
], ['id' => $jobId]);
}
}
/**
* Sprawdź czy istnieje pending job danego typu z opcjonalnym payload match
*
* @param string $jobType
* @param array|null $payloadMatch
* @return bool
*/
public function hasPendingJob($jobType, $payloadMatch = null)
{
$where = [
'job_type' => $jobType,
'status' => [CronJobType::STATUS_PENDING, CronJobType::STATUS_PROCESSING],
];
if ($payloadMatch !== null) {
$where['payload'] = json_encode($payloadMatch);
}
$count = $this->db->count('pp_cron_jobs', $where);
return $count > 0;
}
/**
* Wyczyść stare zakończone zadania
*
* @param int $olderThanDays
*/
public function cleanup($olderThanDays = 30)
{
$cutoff = date('Y-m-d H:i:s', time() - ($olderThanDays * 86400));
$this->db->delete('pp_cron_jobs', [
'status' => [CronJobType::STATUS_COMPLETED, CronJobType::STATUS_FAILED, CronJobType::STATUS_CANCELLED],
'updated_at[<]' => $cutoff,
]);
}
/**
* Odzyskaj zablokowane zadania (stuck w processing)
*
* @param int $olderThanMinutes
*/
public function recoverStuck($olderThanMinutes = 30)
{
$cutoff = date('Y-m-d H:i:s', time() - ($olderThanMinutes * 60));
$this->db->update('pp_cron_jobs', [
'status' => CronJobType::STATUS_PENDING,
'started_at' => null,
], [
'status' => CronJobType::STATUS_PROCESSING,
'started_at[<]' => $cutoff,
]);
}
/**
* Pobierz harmonogramy gotowe do uruchomienia
*
* @return array
*/
public function getDueSchedules()
{
$now = date('Y-m-d H:i:s');
return $this->db->select('pp_cron_schedules', '*', [
'enabled' => 1,
'OR' => [
'next_run_at' => null,
'next_run_at[<=]' => $now,
],
'ORDER' => ['priority' => 'ASC'],
]);
}
/**
* Aktualizuj harmonogram po uruchomieniu
*
* @param int $scheduleId
* @param int $intervalSeconds
*/
public function touchSchedule($scheduleId, $intervalSeconds)
{
$now = date('Y-m-d H:i:s');
$nextRun = date('Y-m-d H:i:s', time() + $intervalSeconds);
$this->db->update('pp_cron_schedules', [
'last_run_at' => $now,
'next_run_at' => $nextRun,
], ['id' => $scheduleId]);
}
}

View File

@@ -0,0 +1,81 @@
<?php
namespace Domain\CronJob;
class CronJobType
{
// Job types
const APILO_TOKEN_KEEPALIVE = 'apilo_token_keepalive';
const APILO_SEND_ORDER = 'apilo_send_order';
const APILO_SYNC_PAYMENT = 'apilo_sync_payment';
const APILO_SYNC_STATUS = 'apilo_sync_status';
const APILO_PRODUCT_SYNC = 'apilo_product_sync';
const APILO_PRICELIST_SYNC = 'apilo_pricelist_sync';
const APILO_STATUS_POLL = 'apilo_status_poll';
const PRICE_HISTORY = 'price_history';
const ORDER_ANALYSIS = 'order_analysis';
const TRUSTMATE_INVITATION = 'trustmate_invitation';
const GOOGLE_XML_FEED = 'google_xml_feed';
// Priorities (lower = more important)
const PRIORITY_CRITICAL = 10;
const PRIORITY_SEND_ORDER = 40; // apilo_send_order musi być PRZED sync payment/status
const PRIORITY_HIGH = 50;
const PRIORITY_NORMAL = 100;
const PRIORITY_LOW = 200;
// Statuses
const STATUS_PENDING = 'pending';
const STATUS_PROCESSING = 'processing';
const STATUS_COMPLETED = 'completed';
const STATUS_FAILED = 'failed';
const STATUS_CANCELLED = 'cancelled';
// Backoff
const BASE_BACKOFF_SECONDS = 60;
const MAX_BACKOFF_SECONDS = 3600;
/**
* @return string[]
*/
public static function allTypes()
{
return [
self::APILO_TOKEN_KEEPALIVE,
self::APILO_SEND_ORDER,
self::APILO_SYNC_PAYMENT,
self::APILO_SYNC_STATUS,
self::APILO_PRODUCT_SYNC,
self::APILO_PRICELIST_SYNC,
self::APILO_STATUS_POLL,
self::PRICE_HISTORY,
self::ORDER_ANALYSIS,
self::TRUSTMATE_INVITATION,
self::GOOGLE_XML_FEED,
];
}
/**
* @return string[]
*/
public static function allStatuses()
{
return [
self::STATUS_PENDING,
self::STATUS_PROCESSING,
self::STATUS_COMPLETED,
self::STATUS_FAILED,
self::STATUS_CANCELLED,
];
}
/**
* @param int $attempt
* @return int
*/
public static function calculateBackoff($attempt)
{
$backoff = self::BASE_BACKOFF_SECONDS * pow(2, $attempt - 1);
return min($backoff, self::MAX_BACKOFF_SECONDS);
}
}

View File

@@ -7,17 +7,21 @@ class OrderAdminService
private $productRepo;
private $settingsRepo;
private $transportRepo;
/** @var \Domain\CronJob\CronJobRepository|null */
private $cronJobRepo;
public function __construct(
OrderRepository $orders,
$productRepo = null,
$settingsRepo = null,
$transportRepo = null
$transportRepo = null,
$cronJobRepo = null
) {
$this->orders = $orders;
$this->productRepo = $productRepo;
$this->settingsRepo = $settingsRepo;
$this->transportRepo = $transportRepo;
$this->cronJobRepo = $cronJobRepo;
}
public function details(int $orderId): array
@@ -519,92 +523,6 @@ class OrderAdminService
return $this->orders->deleteOrder($orderId);
}
// =========================================================================
// Apilo sync queue (migrated from \shop\Order)
// =========================================================================
private const APILO_SYNC_QUEUE_FILE = '/temp/apilo-sync-queue.json';
public function processApiloSyncQueue(int $limit = 10): int
{
$queue = self::loadApiloSyncQueue();
if (!\Shared\Helpers\Helpers::is_array_fix($queue)) {
return 0;
}
$processed = 0;
foreach ($queue as $key => $task)
{
if ($processed >= $limit) {
break;
}
$order_id = (int)($task['order_id'] ?? 0);
if ($order_id <= 0) {
unset($queue[$key]);
continue;
}
$order = $this->orders->findRawById($order_id);
if (!$order) {
unset($queue[$key]);
continue;
}
$error = '';
$sync_failed = false;
$max_attempts = 50; // ~8h przy cronie co 10 min
// Zamówienie jeszcze nie wysłane do Apilo — czekaj na crona
if (!(int)$order['apilo_order_id']) {
$attempts = (int)($task['attempts'] ?? 0) + 1;
if ($attempts >= $max_attempts) {
// Przekroczono limit prób — porzuć task
unset($queue[$key]);
} else {
$task['attempts'] = $attempts;
$task['last_error'] = 'awaiting_apilo_order';
$task['updated_at'] = date('Y-m-d H:i:s');
$queue[$key] = $task;
}
$processed++;
continue;
}
$payment_pending = !empty($task['payment']) && (int)$order['paid'] === 1;
if ($payment_pending) {
if (!$this->syncApiloPayment($order)) {
$sync_failed = true;
$error = 'payment_sync_failed';
}
}
$status_pending = isset($task['status']) && $task['status'] !== null && $task['status'] !== '';
if (!$sync_failed && $status_pending) {
if (!$this->syncApiloStatus($order, (int)$task['status'])) {
$sync_failed = true;
$error = 'status_sync_failed';
}
}
if ($sync_failed) {
$task['attempts'] = (int)($task['attempts'] ?? 0) + 1;
$task['last_error'] = $error;
$task['updated_at'] = date('Y-m-d H:i:s');
$queue[$key] = $task;
} else {
unset($queue[$key]);
}
$processed++;
}
self::saveApiloSyncQueue($queue);
return $processed;
}
// =========================================================================
// Private: email
// =========================================================================
@@ -689,7 +607,7 @@ class OrderAdminService
'Brak apilo_order_id — płatność zakolejkowana do sync',
['apilo_order_id' => $order['apilo_order_id'] ?? null]
);
self::queueApiloSync((int)$order['id'], true, null, 'awaiting_apilo_order');
$this->queueApiloSync((int)$order['id'], true, null, 'awaiting_apilo_order');
} elseif (!$this->syncApiloPayment($order)) {
\Domain\Integrations\ApiloLogger::log(
$db,
@@ -698,7 +616,7 @@ class OrderAdminService
'Sync płatności nieudany — zakolejkowano ponowną próbę',
['apilo_order_id' => $order['apilo_order_id']]
);
self::queueApiloSync((int)$order['id'], true, null, 'payment_sync_failed');
$this->queueApiloSync((int)$order['id'], true, null, 'payment_sync_failed');
}
}
@@ -739,7 +657,7 @@ class OrderAdminService
'Brak apilo_order_id — status zakolejkowany do sync',
['apilo_order_id' => $order['apilo_order_id'] ?? null, 'target_status' => $status]
);
self::queueApiloSync((int)$order['id'], false, $status, 'awaiting_apilo_order');
$this->queueApiloSync((int)$order['id'], false, $status, 'awaiting_apilo_order');
} elseif (!$this->syncApiloStatus($order, $status)) {
\Domain\Integrations\ApiloLogger::log(
$db,
@@ -748,11 +666,11 @@ class OrderAdminService
'Sync statusu nieudany — zakolejkowano ponowną próbę',
['apilo_order_id' => $order['apilo_order_id'], 'target_status' => $status]
);
self::queueApiloSync((int)$order['id'], false, $status, 'status_sync_failed');
$this->queueApiloSync((int)$order['id'], false, $status, 'status_sync_failed');
}
}
private function syncApiloPayment(array $order): bool
public function syncApiloPayment(array $order): bool
{
global $config;
@@ -819,7 +737,7 @@ class OrderAdminService
return true;
}
private function syncApiloStatus(array $order, int $status): bool
public function syncApiloStatus(array $order, int $status): bool
{
global $config;
@@ -882,59 +800,42 @@ class OrderAdminService
}
// =========================================================================
// Private: Apilo sync queue file helpers
// Private: Apilo sync queue (DB-based via CronJobRepository)
// =========================================================================
private static function queueApiloSync(int $order_id, bool $payment, ?int $status, string $error): void
private function queueApiloSync(int $order_id, bool $payment, ?int $status, string $error): void
{
if ($order_id <= 0) return;
$queue = self::loadApiloSyncQueue();
$key = (string)$order_id;
$row = is_array($queue[$key] ?? null) ? $queue[$key] : [];
if ($this->cronJobRepo === null) return;
if ($payment) {
$jobType = \Domain\CronJob\CronJobType::APILO_SYNC_PAYMENT;
$payload = ['order_id' => $order_id];
if (!$this->cronJobRepo->hasPendingJob($jobType, $payload)) {
$this->cronJobRepo->enqueue(
$jobType,
$payload,
\Domain\CronJob\CronJobType::PRIORITY_HIGH,
50
);
}
}
$row['order_id'] = $order_id;
$row['payment'] = !empty($row['payment']) || $payment ? 1 : 0;
if ($status !== null) {
$row['status'] = $status;
$jobType = \Domain\CronJob\CronJobType::APILO_SYNC_STATUS;
$payload = ['order_id' => $order_id, 'status' => $status];
if (!$this->cronJobRepo->hasPendingJob($jobType, $payload)) {
$this->cronJobRepo->enqueue(
$jobType,
$payload,
\Domain\CronJob\CronJobType::PRIORITY_HIGH,
50
);
}
}
$row['attempts'] = (int)($row['attempts'] ?? 0) + 1;
$row['last_error'] = $error;
$row['updated_at'] = date('Y-m-d H:i:s');
$queue[$key] = $row;
self::saveApiloSyncQueue($queue);
}
private static function apiloSyncQueuePath(): string
{
return dirname(__DIR__, 2) . self::APILO_SYNC_QUEUE_FILE;
}
private static function loadApiloSyncQueue(): array
{
$path = self::apiloSyncQueuePath();
if (!file_exists($path)) return [];
$content = file_get_contents($path);
if (!$content) return [];
$decoded = json_decode($content, true);
if (!is_array($decoded)) return [];
return $decoded;
}
private static function saveApiloSyncQueue(array $queue): void
{
$path = self::apiloSyncQueuePath();
$dir = dirname($path);
if (!is_dir($dir)) {
mkdir($dir, 0777, true);
}
file_put_contents($path, json_encode($queue, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES), LOCK_EX);
}
private static function appendApiloLog(string $message): void