Replace file-based JSON cron queue with DB-backed job queue (pp_cron_jobs, pp_cron_schedules). New Domain\CronJob module: CronJobType (constants), CronJobRepository (CRUD, atomic fetch, retry/backoff), CronJobProcessor (orchestration with handler registration). Priority ordering guarantees apilo_send_order (40) runs before sync tasks (50). Includes cron.php auth protection, race condition fix in fetchNext, API response validation, and DI wiring across all entry points. 41 new tests. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
141 lines
4.2 KiB
PHP
141 lines
4.2 KiB
PHP
<?php
|
|
|
|
namespace Domain\CronJob;
|
|
|
|
class CronJobProcessor
|
|
{
|
|
/** @var CronJobRepository */
|
|
private $cronRepo;
|
|
|
|
/** @var array<string, callable> */
|
|
private $handlers = [];
|
|
|
|
/**
|
|
* @param CronJobRepository $cronRepo
|
|
*/
|
|
public function __construct(CronJobRepository $cronRepo)
|
|
{
|
|
$this->cronRepo = $cronRepo;
|
|
}
|
|
|
|
/**
|
|
* Zarejestruj handler dla typu zadania
|
|
*
|
|
* @param string $jobType
|
|
* @param callable $handler fn($payload): bool|array — true/array = success, false/exception = fail
|
|
*/
|
|
public function registerHandler($jobType, callable $handler)
|
|
{
|
|
$this->handlers[$jobType] = $handler;
|
|
}
|
|
|
|
/**
|
|
* Utwórz zadania z harmonogramów, których next_run_at <= NOW
|
|
*
|
|
* @return int Liczba utworzonych zadań
|
|
*/
|
|
public function createScheduledJobs()
|
|
{
|
|
$schedules = $this->cronRepo->getDueSchedules();
|
|
$created = 0;
|
|
|
|
foreach ($schedules as $schedule) {
|
|
$jobType = $schedule['job_type'];
|
|
|
|
// Nie twórz duplikatów
|
|
if ($this->cronRepo->hasPendingJob($jobType)) {
|
|
// Mimo duplikatu, przesuń next_run_at żeby nie sprawdzać co sekundę
|
|
$this->cronRepo->touchSchedule($schedule['id'], (int) $schedule['interval_seconds']);
|
|
continue;
|
|
}
|
|
|
|
$payload = null;
|
|
if (!empty($schedule['payload'])) {
|
|
$payload = json_decode($schedule['payload'], true);
|
|
}
|
|
|
|
$this->cronRepo->enqueue(
|
|
$jobType,
|
|
$payload,
|
|
(int) $schedule['priority'],
|
|
(int) $schedule['max_attempts']
|
|
);
|
|
|
|
$this->cronRepo->touchSchedule($schedule['id'], (int) $schedule['interval_seconds']);
|
|
$created++;
|
|
}
|
|
|
|
return $created;
|
|
}
|
|
|
|
/**
|
|
* Przetwórz kolejkę zadań
|
|
*
|
|
* @param int $limit
|
|
* @return array Statystyki: ['processed' => int, 'succeeded' => int, 'failed' => int, 'skipped' => int]
|
|
*/
|
|
public function processQueue($limit = 10)
|
|
{
|
|
$stats = ['processed' => 0, 'succeeded' => 0, 'failed' => 0, 'skipped' => 0];
|
|
|
|
$jobs = $this->cronRepo->fetchNext($limit);
|
|
|
|
foreach ($jobs as $job) {
|
|
$jobType = $job['job_type'];
|
|
$jobId = (int) $job['id'];
|
|
$stats['processed']++;
|
|
|
|
if (!isset($this->handlers[$jobType])) {
|
|
$this->cronRepo->markFailed($jobId, 'No handler registered for job type: ' . $jobType, (int) $job['attempts']);
|
|
$stats['skipped']++;
|
|
continue;
|
|
}
|
|
|
|
try {
|
|
$result = call_user_func($this->handlers[$jobType], $job['payload']);
|
|
|
|
if ($result === false) {
|
|
$this->cronRepo->markFailed($jobId, 'Handler returned false', (int) $job['attempts']);
|
|
$stats['failed']++;
|
|
} else {
|
|
$resultData = is_array($result) ? $result : null;
|
|
$this->cronRepo->markCompleted($jobId, $resultData);
|
|
$stats['succeeded']++;
|
|
}
|
|
} catch (\Exception $e) {
|
|
$this->cronRepo->markFailed($jobId, $e->getMessage(), (int) $job['attempts']);
|
|
$stats['failed']++;
|
|
} catch (\Throwable $e) {
|
|
$this->cronRepo->markFailed($jobId, $e->getMessage(), (int) $job['attempts']);
|
|
$stats['failed']++;
|
|
}
|
|
}
|
|
|
|
return $stats;
|
|
}
|
|
|
|
/**
|
|
* Główna metoda: utwórz scheduled jobs + przetwórz kolejkę
|
|
*
|
|
* @param int $limit
|
|
* @return array ['scheduled' => int, 'processed' => int, 'succeeded' => int, 'failed' => int, 'skipped' => int]
|
|
*/
|
|
public function run($limit = 20)
|
|
{
|
|
// Odzyskaj stuck jobs
|
|
$this->cronRepo->recoverStuck(30);
|
|
|
|
// Utwórz zadania z harmonogramów
|
|
$scheduled = $this->createScheduledJobs();
|
|
|
|
// Przetwórz kolejkę
|
|
$stats = $this->processQueue($limit);
|
|
$stats['scheduled'] = $scheduled;
|
|
|
|
// Cleanup starych zadań (raz na uruchomienie)
|
|
$this->cronRepo->cleanup(30);
|
|
|
|
return $stats;
|
|
}
|
|
}
|