$handlers */ public function __construct( private readonly CronRepository $repository, private readonly Logger $logger, private readonly array $handlers ) { } /** * @return array */ public function run(int $limit): array { $safeLimit = max(1, min(100, $limit)); $now = new DateTimeImmutable('now'); $dispatched = $this->dispatchDueSchedules($now); $processed = 0; $completed = 0; $failed = 0; while ($processed < $safeLimit) { $job = $this->repository->claimNextPendingJob(new DateTimeImmutable('now')); if ($job === null) { break; } $processed++; $jobId = (int) ($job['id'] ?? 0); $jobType = (string) ($job['job_type'] ?? ''); try { $result = $this->handleJob($jobType, is_array($job['payload'] ?? null) ? $job['payload'] : []); $this->repository->markJobCompleted($jobId, $result); $completed++; } catch (Throwable $exception) { $this->repository->markJobFailed($jobId, $exception->getMessage(), new DateTimeImmutable('now'), 60); $this->logger->error('Cron job failed', [ 'job_id' => $jobId, 'job_type' => $jobType, 'error' => $exception->getMessage(), ]); $failed++; } } return [ 'dispatched' => $dispatched, 'processed' => $processed, 'completed' => $completed, 'failed' => $failed, ]; } private function dispatchDueSchedules(DateTimeImmutable $now): int { $schedules = $this->repository->findDueSchedules($now); $count = 0; foreach ($schedules as $schedule) { $this->repository->enqueueJobFromSchedule($schedule, $now); $count++; } return $count; } /** * @param array $payload * @return array */ private function handleJob(string $jobType, array $payload): array { $handler = $this->handlers[$jobType] ?? null; if ($handler === null || !method_exists($handler, 'handle')) { throw new RuntimeException('Brak handlera dla typu joba: ' . $jobType); } $result = $handler->handle($payload); if (!is_array($result)) { return ['ok' => true]; } return $result; } }