Files
shopPRO/autoload/Domain/CronJob/CronJobProcessor.php
Jacek Pyziak 52119a0724 feat: database-backed cron job queue replacing JSON file system
Replace file-based JSON cron queue with DB-backed job queue (pp_cron_jobs,
pp_cron_schedules). New Domain\CronJob module: CronJobType (constants),
CronJobRepository (CRUD, atomic fetch, retry/backoff), CronJobProcessor
(orchestration with handler registration). Priority ordering guarantees
apilo_send_order (40) runs before sync tasks (50). Includes cron.php auth
protection, race condition fix in fetchNext, API response validation,
and DI wiring across all entry points. 41 new tests.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 13:29:11 +01:00

141 lines
4.2 KiB
PHP

<?php
namespace Domain\CronJob;
class CronJobProcessor
{
/** @var CronJobRepository */
private $cronRepo;
/** @var array<string, callable> */
private $handlers = [];
/**
* @param CronJobRepository $cronRepo
*/
public function __construct(CronJobRepository $cronRepo)
{
$this->cronRepo = $cronRepo;
}
/**
* Zarejestruj handler dla typu zadania
*
* @param string $jobType
* @param callable $handler fn($payload): bool|array — true/array = success, false/exception = fail
*/
public function registerHandler($jobType, callable $handler)
{
$this->handlers[$jobType] = $handler;
}
/**
* Utwórz zadania z harmonogramów, których next_run_at <= NOW
*
* @return int Liczba utworzonych zadań
*/
public function createScheduledJobs()
{
$schedules = $this->cronRepo->getDueSchedules();
$created = 0;
foreach ($schedules as $schedule) {
$jobType = $schedule['job_type'];
// Nie twórz duplikatów
if ($this->cronRepo->hasPendingJob($jobType)) {
// Mimo duplikatu, przesuń next_run_at żeby nie sprawdzać co sekundę
$this->cronRepo->touchSchedule($schedule['id'], (int) $schedule['interval_seconds']);
continue;
}
$payload = null;
if (!empty($schedule['payload'])) {
$payload = json_decode($schedule['payload'], true);
}
$this->cronRepo->enqueue(
$jobType,
$payload,
(int) $schedule['priority'],
(int) $schedule['max_attempts']
);
$this->cronRepo->touchSchedule($schedule['id'], (int) $schedule['interval_seconds']);
$created++;
}
return $created;
}
/**
* Przetwórz kolejkę zadań
*
* @param int $limit
* @return array Statystyki: ['processed' => int, 'succeeded' => int, 'failed' => int, 'skipped' => int]
*/
public function processQueue($limit = 10)
{
$stats = ['processed' => 0, 'succeeded' => 0, 'failed' => 0, 'skipped' => 0];
$jobs = $this->cronRepo->fetchNext($limit);
foreach ($jobs as $job) {
$jobType = $job['job_type'];
$jobId = (int) $job['id'];
$stats['processed']++;
if (!isset($this->handlers[$jobType])) {
$this->cronRepo->markFailed($jobId, 'No handler registered for job type: ' . $jobType, (int) $job['attempts']);
$stats['skipped']++;
continue;
}
try {
$result = call_user_func($this->handlers[$jobType], $job['payload']);
if ($result === false) {
$this->cronRepo->markFailed($jobId, 'Handler returned false', (int) $job['attempts']);
$stats['failed']++;
} else {
$resultData = is_array($result) ? $result : null;
$this->cronRepo->markCompleted($jobId, $resultData);
$stats['succeeded']++;
}
} catch (\Exception $e) {
$this->cronRepo->markFailed($jobId, $e->getMessage(), (int) $job['attempts']);
$stats['failed']++;
} catch (\Throwable $e) {
$this->cronRepo->markFailed($jobId, $e->getMessage(), (int) $job['attempts']);
$stats['failed']++;
}
}
return $stats;
}
/**
* Główna metoda: utwórz scheduled jobs + przetwórz kolejkę
*
* @param int $limit
* @return array ['scheduled' => int, 'processed' => int, 'succeeded' => int, 'failed' => int, 'skipped' => int]
*/
public function run($limit = 20)
{
// Odzyskaj stuck jobs
$this->cronRepo->recoverStuck(30);
// Utwórz zadania z harmonogramów
$scheduled = $this->createScheduledJobs();
// Przetwórz kolejkę
$stats = $this->processQueue($limit);
$stats['scheduled'] = $scheduled;
// Cleanup starych zadań (raz na uruchomienie)
$this->cronRepo->cleanup(30);
return $stats;
}
}