db = $db; } /** * Dodaj zadanie do kolejki * * @param string $jobType * @param array|null $payload * @param int $priority * @param int $maxAttempts * @param string|null $scheduledAt * @return int|null ID nowego zadania */ public function enqueue($jobType, $payload = null, $priority = CronJobType::PRIORITY_NORMAL, $maxAttempts = 10, $scheduledAt = null) { $data = [ 'job_type' => $jobType, 'status' => CronJobType::STATUS_PENDING, 'priority' => $priority, 'max_attempts' => $maxAttempts, 'scheduled_at' => $scheduledAt ? $scheduledAt : date('Y-m-d H:i:s'), ]; if ($payload !== null) { $data['payload'] = json_encode($payload); } $this->db->insert('pp_cron_jobs', $data); $id = $this->db->id(); return $id ? (int) $id : null; } /** * Atomowe pobranie następnych zadań do przetworzenia. * * Uwaga: SELECT + UPDATE nie jest w pełni atomowe bez transakcji. * Po UPDATE re-SELECT potwierdza, które joby zostały faktycznie przejęte * (chroni przed race condition przy wielu workerach). * * @param int $limit * @return array */ public function fetchNext($limit = 5) { $now = date('Y-m-d H:i:s'); $jobs = $this->db->select('pp_cron_jobs', '*', [ 'status' => CronJobType::STATUS_PENDING, 'scheduled_at[<=]' => $now, 'ORDER' => ['priority' => 'ASC', 'scheduled_at' => 'ASC'], 'LIMIT' => $limit, ]); if (empty($jobs)) { return []; } $ids = array_column($jobs, 'id'); $this->db->update('pp_cron_jobs', [ 'status' => CronJobType::STATUS_PROCESSING, 'started_at' => $now, 'attempts[+]' => 1, ], [ 'id' => $ids, 'status' => CronJobType::STATUS_PENDING, ]); // Re-SELECT: potwierdź, które joby zostały faktycznie przejęte $claimed = $this->db->select('pp_cron_jobs', '*', [ 'id' => $ids, 'status' => CronJobType::STATUS_PROCESSING, 'started_at' => $now, ]); if (empty($claimed)) { return []; } foreach ($claimed as &$job) { if ($job['payload'] !== null) { $job['payload'] = json_decode($job['payload'], true); } } return $claimed; } /** * Oznacz zadanie jako zakończone * * @param int $jobId * @param mixed $result */ public function markCompleted($jobId, $result = null) { $data = [ 'status' => CronJobType::STATUS_COMPLETED, 'completed_at' => date('Y-m-d H:i:s'), ]; if ($result !== null) { $data['result'] = json_encode($result); } $this->db->update('pp_cron_jobs', $data, ['id' => $jobId]); } /** * Oznacz zadanie jako nieudane z backoffem * * @param int $jobId * @param string $error * @param int $attempt Numer próby (do obliczenia backoffu) */ public function markFailed($jobId, $error, $attempt = 1) { $job = $this->db->get('pp_cron_jobs', ['job_type', 'max_attempts', 'attempts'], ['id' => $jobId]); $attempts = $job ? (int) $job['attempts'] : $attempt; $maxAttempts = $job ? (int) $job['max_attempts'] : 10; $jobType = $job ? $job['job_type'] : ''; // Order-related Apilo joby — infinite retry co 30 min if (CronJobType::isOrderRelatedApiloJob($jobType)) { $nextRun = date('Y-m-d H:i:s', time() + CronJobType::APILO_ORDER_BACKOFF_SECONDS); $this->db->update('pp_cron_jobs', [ 'status' => CronJobType::STATUS_PENDING, 'last_error' => mb_substr($error, 0, 500), 'scheduled_at' => $nextRun, ], ['id' => $jobId]); return; } if ($attempts >= $maxAttempts) { // Przekroczono limit prób — trwale failed $this->db->update('pp_cron_jobs', [ 'status' => CronJobType::STATUS_FAILED, 'last_error' => mb_substr($error, 0, 500), 'completed_at' => date('Y-m-d H:i:s'), ], ['id' => $jobId]); } else { // Wróć do pending z backoffem $backoff = CronJobType::calculateBackoff($attempts); $nextRun = date('Y-m-d H:i:s', time() + $backoff); $this->db->update('pp_cron_jobs', [ 'status' => CronJobType::STATUS_PENDING, 'last_error' => mb_substr($error, 0, 500), 'scheduled_at' => $nextRun, ], ['id' => $jobId]); } } /** * Sprawdź czy istnieje pending job danego typu z opcjonalnym payload match * * @param string $jobType * @param array|null $payloadMatch * @return bool */ public function hasPendingJob($jobType, $payloadMatch = null) { $where = [ 'job_type' => $jobType, 'status' => [CronJobType::STATUS_PENDING, CronJobType::STATUS_PROCESSING], ]; if ($payloadMatch !== null) { $where['payload'] = json_encode($payloadMatch); } $count = $this->db->count('pp_cron_jobs', $where); return $count > 0; } /** * Wyczyść stare zakończone zadania * * @param int $olderThanDays */ public function cleanup($olderThanDays = 30) { $cutoff = date('Y-m-d H:i:s', time() - ($olderThanDays * 86400)); $this->db->delete('pp_cron_jobs', [ 'status' => [CronJobType::STATUS_COMPLETED, CronJobType::STATUS_FAILED, CronJobType::STATUS_CANCELLED], 'updated_at[<]' => $cutoff, ]); } /** * Odzyskaj zablokowane zadania (stuck w processing) * * @param int $olderThanMinutes */ public function recoverStuck($olderThanMinutes = 30) { $cutoff = date('Y-m-d H:i:s', time() - ($olderThanMinutes * 60)); $this->db->update('pp_cron_jobs', [ 'status' => CronJobType::STATUS_PENDING, 'started_at' => null, ], [ 'status' => CronJobType::STATUS_PROCESSING, 'started_at[<]' => $cutoff, ]); } /** * Pobierz harmonogramy gotowe do uruchomienia * * @return array */ public function getDueSchedules() { $now = date('Y-m-d H:i:s'); return $this->db->select('pp_cron_schedules', '*', [ 'enabled' => 1, 'OR' => [ 'next_run_at' => null, 'next_run_at[<=]' => $now, ], 'ORDER' => ['priority' => 'ASC'], ]); } /** * Aktualizuj harmonogram po uruchomieniu * * @param int $scheduleId * @param int $intervalSeconds */ public function touchSchedule($scheduleId, $intervalSeconds) { $now = date('Y-m-d H:i:s'); $nextRun = date('Y-m-d H:i:s', time() + $intervalSeconds); $this->db->update('pp_cron_schedules', [ 'last_run_at' => $now, 'next_run_at' => $nextRun, ], ['id' => $scheduleId]); } }