*/ private $handlers = []; /** * @param CronJobRepository $cronRepo */ public function __construct(CronJobRepository $cronRepo) { $this->cronRepo = $cronRepo; } /** * Zarejestruj handler dla typu zadania * * @param string $jobType * @param callable $handler fn($payload): bool|array — true/array = success, false/exception = fail */ public function registerHandler($jobType, callable $handler) { $this->handlers[$jobType] = $handler; } /** * Utwórz zadania z harmonogramów, których next_run_at <= NOW * * @return int Liczba utworzonych zadań */ public function createScheduledJobs() { $schedules = $this->cronRepo->getDueSchedules(); $created = 0; foreach ($schedules as $schedule) { $jobType = $schedule['job_type']; // Nie twórz duplikatów if ($this->cronRepo->hasPendingJob($jobType)) { // Mimo duplikatu, przesuń next_run_at żeby nie sprawdzać co sekundę $this->cronRepo->touchSchedule($schedule['id'], (int) $schedule['interval_seconds']); continue; } $payload = null; if (!empty($schedule['payload'])) { $payload = json_decode($schedule['payload'], true); } $this->cronRepo->enqueue( $jobType, $payload, (int) $schedule['priority'], (int) $schedule['max_attempts'] ); $this->cronRepo->touchSchedule($schedule['id'], (int) $schedule['interval_seconds']); $created++; } return $created; } /** * Przetwórz kolejkę zadań * * @param int $limit * @return array Statystyki: ['processed' => int, 'succeeded' => int, 'failed' => int, 'skipped' => int] */ public function processQueue($limit = 10) { $stats = ['processed' => 0, 'succeeded' => 0, 'failed' => 0, 'skipped' => 0]; $jobs = $this->cronRepo->fetchNext($limit); foreach ($jobs as $job) { $jobType = $job['job_type']; $jobId = (int) $job['id']; $stats['processed']++; if (!isset($this->handlers[$jobType])) { $this->cronRepo->markFailed($jobId, 'No handler registered for job type: ' . $jobType, (int) $job['attempts']); $stats['skipped']++; continue; } try { $result = call_user_func($this->handlers[$jobType], $job['payload']); if ($result === false) { $this->cronRepo->markFailed($jobId, 'Handler returned false', (int) $job['attempts']); $stats['failed']++; } else { $resultData = is_array($result) ? $result : null; $this->cronRepo->markCompleted($jobId, $resultData); $stats['succeeded']++; } } catch (\Exception $e) { $this->cronRepo->markFailed($jobId, $e->getMessage(), (int) $job['attempts']); $stats['failed']++; } catch (\Throwable $e) { $this->cronRepo->markFailed($jobId, $e->getMessage(), (int) $job['attempts']); $stats['failed']++; } } return $stats; } /** * Główna metoda: utwórz scheduled jobs + przetwórz kolejkę * * @param int $limit * @return array ['scheduled' => int, 'processed' => int, 'succeeded' => int, 'failed' => int, 'skipped' => int] */ public function run($limit = 20) { // Odzyskaj stuck jobs $this->cronRepo->recoverStuck(30); // Utwórz zadania z harmonogramów $scheduled = $this->createScheduledJobs(); // Przetwórz kolejkę $stats = $this->processQueue($limit); $stats['scheduled'] = $scheduled; // Cleanup starych zadań (raz na uruchomienie) $this->cronRepo->cleanup(30); return $stats; } }