getSettingValue($key); if ($value === null) { return $default; } return in_array(strtolower($value), ['1', 'true', 'yes', 'on'], true); } public function getIntSetting(string $key, int $default, int $min, int $max): int { $value = $this->getSettingValue($key); if ($value === null || !is_numeric($value)) { return max($min, min($max, $default)); } return max($min, min($max, (int) $value)); } public function getStringSetting(string $key, string $default): string { $value = $this->getSettingValue($key); if ($value === null) { return $default; } return $value; } public function upsertSetting(string $key, string $value): void { $statement = $this->pdo->prepare( 'INSERT INTO app_settings (setting_key, setting_value, created_at, updated_at) VALUES (:setting_key, :setting_value, NOW(), NOW()) ON DUPLICATE KEY UPDATE setting_value = VALUES(setting_value), updated_at = VALUES(updated_at)' ); $statement->execute([ 'setting_key' => $key, 'setting_value' => $value, ]); } /** * @return array> */ public function listSchedules(): array { $statement = $this->pdo->query( 'SELECT id, job_type, interval_seconds, priority, max_attempts, payload, enabled, last_run_at, next_run_at FROM cron_schedules ORDER BY priority ASC, job_type ASC' ); $rows = $statement->fetchAll(PDO::FETCH_ASSOC); if (!is_array($rows)) { return []; } return array_map(fn (array $row): array => $this->normalizeScheduleRow($row), $rows); } /** * @return array> */ public function listFutureJobs(int $limit = 50): array { $safeLimit = max(1, min(200, $limit)); $statement = $this->pdo->prepare( 'SELECT id, job_type, status, priority, attempts, max_attempts, scheduled_at, last_error, created_at FROM cron_jobs WHERE status IN ("pending", "processing") ORDER BY scheduled_at ASC, priority ASC, id ASC LIMIT :limit' ); $statement->bindValue(':limit', $safeLimit, PDO::PARAM_INT); $statement->execute(); $rows = $statement->fetchAll(PDO::FETCH_ASSOC); if (!is_array($rows)) { return []; } return array_map(fn (array $row): array => $this->normalizeJobRow($row), $rows); } /** * @return array> */ public function listPastJobs(int $limit = 50, int $offset = 0): array { $safeLimit = max(1, min(200, $limit)); $safeOffset = max(0, $offset); $statement = $this->pdo->prepare( 'SELECT id, job_type, status, priority, attempts, max_attempts, scheduled_at, started_at, completed_at, last_error, created_at FROM cron_jobs WHERE status IN ("completed", "failed", "cancelled") ORDER BY completed_at DESC, id DESC LIMIT :limit OFFSET :offset' ); $statement->bindValue(':limit', $safeLimit, PDO::PARAM_INT); $statement->bindValue(':offset', $safeOffset, PDO::PARAM_INT); $statement->execute(); $rows = $statement->fetchAll(PDO::FETCH_ASSOC); if (!is_array($rows)) { return []; } return array_map(fn (array $row): array => $this->normalizeJobRow($row), $rows); } public function countPastJobs(): int { $statement = $this->pdo->query( 'SELECT COUNT(*) FROM cron_jobs WHERE status IN ("completed", "failed", "cancelled")' ); $value = $statement !== false ? $statement->fetchColumn() : 0; return max(0, (int) $value); } /** * @return array> */ public function findDueSchedules(DateTimeImmutable $now): array { $statement = $this->pdo->prepare( 'SELECT id, job_type, interval_seconds, priority, max_attempts, payload, enabled, last_run_at, next_run_at FROM cron_schedules WHERE enabled = 1 AND (next_run_at IS NULL OR next_run_at <= :now) ORDER BY priority ASC, next_run_at ASC, id ASC' ); $statement->execute([ 'now' => $now->format('Y-m-d H:i:s'), ]); $rows = $statement->fetchAll(PDO::FETCH_ASSOC); if (!is_array($rows)) { return []; } return array_map(fn (array $row): array => $this->normalizeScheduleRow($row), $rows); } /** * @param array $schedule */ public function enqueueJobFromSchedule(array $schedule, DateTimeImmutable $now): void { $payloadJson = $this->encodeJson($schedule['payload'] ?? null); $jobStatement = $this->pdo->prepare( 'INSERT INTO cron_jobs ( job_type, status, priority, payload, attempts, max_attempts, scheduled_at, created_at, updated_at ) VALUES ( :job_type, "pending", :priority, :payload, 0, :max_attempts, :scheduled_at, NOW(), NOW() )' ); $jobStatement->execute([ 'job_type' => (string) ($schedule['job_type'] ?? ''), 'priority' => (int) ($schedule['priority'] ?? 100), 'payload' => $payloadJson, 'max_attempts' => max(1, (int) ($schedule['max_attempts'] ?? 3)), 'scheduled_at' => $now->format('Y-m-d H:i:s'), ]); $intervalSeconds = max(1, (int) ($schedule['interval_seconds'] ?? 60)); $nextRunAt = $now->modify('+' . $intervalSeconds . ' seconds'); $scheduleStatement = $this->pdo->prepare( 'UPDATE cron_schedules SET last_run_at = :last_run_at, next_run_at = :next_run_at, updated_at = NOW() WHERE id = :id' ); $scheduleStatement->execute([ 'last_run_at' => $now->format('Y-m-d H:i:s'), 'next_run_at' => $nextRunAt->format('Y-m-d H:i:s'), 'id' => (int) ($schedule['id'] ?? 0), ]); } /** * @return array|null */ public function claimNextPendingJob(DateTimeImmutable $now): ?array { $selectStatement = $this->pdo->prepare( 'SELECT id, job_type, status, priority, payload, attempts, max_attempts, scheduled_at FROM cron_jobs WHERE status = "pending" AND scheduled_at <= :now ORDER BY priority ASC, scheduled_at ASC, id ASC LIMIT 1' ); $selectStatement->execute([ 'now' => $now->format('Y-m-d H:i:s'), ]); $row = $selectStatement->fetch(PDO::FETCH_ASSOC); if (!is_array($row)) { return null; } $jobId = (int) ($row['id'] ?? 0); if ($jobId <= 0) { return null; } $updateStatement = $this->pdo->prepare( 'UPDATE cron_jobs SET status = "processing", started_at = :started_at, attempts = attempts + 1, updated_at = NOW() WHERE id = :id AND status = "pending"' ); $updateStatement->execute([ 'started_at' => $now->format('Y-m-d H:i:s'), 'id' => $jobId, ]); if ($updateStatement->rowCount() !== 1) { return null; } $refreshStatement = $this->pdo->prepare( 'SELECT id, job_type, status, priority, payload, attempts, max_attempts, scheduled_at, started_at FROM cron_jobs WHERE id = :id LIMIT 1' ); $refreshStatement->execute(['id' => $jobId]); $claimed = $refreshStatement->fetch(PDO::FETCH_ASSOC); if (!is_array($claimed)) { return null; } return $this->normalizeJobRow($claimed); } /** * @param array|null $result */ public function markJobCompleted(int $jobId, ?array $result = null): void { $statement = $this->pdo->prepare( 'UPDATE cron_jobs SET status = "completed", result = :result, completed_at = NOW(), last_error = NULL, updated_at = NOW() WHERE id = :id' ); $statement->execute([ 'result' => $this->encodeJson($result), 'id' => $jobId, ]); } /** * @param array|null $result */ public function markJobFailed(int $jobId, string $error, DateTimeImmutable $now, int $retryDelaySeconds = 60, ?array $result = null): void { $statement = $this->pdo->prepare( 'SELECT attempts, max_attempts FROM cron_jobs WHERE id = :id LIMIT 1' ); $statement->execute(['id' => $jobId]); $row = $statement->fetch(PDO::FETCH_ASSOC); if (!is_array($row)) { return; } $attempts = (int) ($row['attempts'] ?? 0); $maxAttempts = max(1, (int) ($row['max_attempts'] ?? 1)); $errorMessage = mb_substr(trim($error), 0, 500); if ($attempts < $maxAttempts) { $scheduledAt = $now->modify('+' . max(10, $retryDelaySeconds) . ' seconds'); $retryStatement = $this->pdo->prepare( 'UPDATE cron_jobs SET status = "pending", result = :result, scheduled_at = :scheduled_at, started_at = NULL, completed_at = NULL, last_error = :last_error, updated_at = NOW() WHERE id = :id' ); $retryStatement->execute([ 'result' => $this->encodeJson($result), 'scheduled_at' => $scheduledAt->format('Y-m-d H:i:s'), 'last_error' => $errorMessage, 'id' => $jobId, ]); return; } $failStatement = $this->pdo->prepare( 'UPDATE cron_jobs SET status = "failed", result = :result, completed_at = NOW(), last_error = :last_error, updated_at = NOW() WHERE id = :id' ); $failStatement->execute([ 'result' => $this->encodeJson($result), 'last_error' => $errorMessage, 'id' => $jobId, ]); } public function upsertSchedule( string $jobType, int $intervalSeconds, int $priority, int $maxAttempts, ?array $payload, bool $enabled ): void { $statement = $this->pdo->prepare( 'INSERT INTO cron_schedules ( job_type, interval_seconds, priority, max_attempts, payload, enabled, last_run_at, next_run_at, created_at, updated_at ) VALUES ( :job_type, :interval_seconds, :priority, :max_attempts, :payload, :enabled, NULL, NOW(), NOW(), NOW() ) ON DUPLICATE KEY UPDATE interval_seconds = VALUES(interval_seconds), priority = VALUES(priority), max_attempts = VALUES(max_attempts), payload = VALUES(payload), enabled = VALUES(enabled), updated_at = VALUES(updated_at)' ); $statement->execute([ 'job_type' => trim($jobType), 'interval_seconds' => max(1, $intervalSeconds), 'priority' => max(1, min(255, $priority)), 'max_attempts' => max(1, min(20, $maxAttempts)), 'payload' => $this->encodeJson($payload), 'enabled' => $enabled ? 1 : 0, ]); } public function updateScheduleInterval(string $jobType, int $intervalSeconds): void { $statement = $this->pdo->prepare( 'UPDATE cron_schedules SET interval_seconds = :interval_seconds, updated_at = NOW() WHERE job_type = :job_type' ); $statement->execute([ 'interval_seconds' => max(1, $intervalSeconds), 'job_type' => trim($jobType), ]); } public function getScheduleInterval(string $jobType): ?int { $statement = $this->pdo->prepare( 'SELECT interval_seconds FROM cron_schedules WHERE job_type = :job_type LIMIT 1' ); $statement->execute(['job_type' => trim($jobType)]); $value = $statement->fetchColumn(); return $value !== false ? (int) $value : null; } private function getSettingValue(string $key): ?string { try { $statement = $this->pdo->prepare( 'SELECT setting_value FROM app_settings WHERE setting_key = :setting_key LIMIT 1' ); $statement->execute(['setting_key' => $key]); $value = $statement->fetchColumn(); } catch (Throwable) { return null; } if (!is_string($value)) { return null; } return trim($value); } /** * @param array $row * @return array */ private function normalizeScheduleRow(array $row): array { return [ 'id' => (int) ($row['id'] ?? 0), 'job_type' => (string) ($row['job_type'] ?? ''), 'interval_seconds' => (int) ($row['interval_seconds'] ?? 0), 'priority' => (int) ($row['priority'] ?? 100), 'max_attempts' => (int) ($row['max_attempts'] ?? 3), 'payload' => $this->decodeJson((string) ($row['payload'] ?? '')), 'enabled' => (int) ($row['enabled'] ?? 0) === 1, 'last_run_at' => (string) ($row['last_run_at'] ?? ''), 'next_run_at' => (string) ($row['next_run_at'] ?? ''), ]; } /** * @param array $row * @return array */ private function normalizeJobRow(array $row): array { return [ 'id' => (int) ($row['id'] ?? 0), 'job_type' => (string) ($row['job_type'] ?? ''), 'status' => (string) ($row['status'] ?? ''), 'priority' => (int) ($row['priority'] ?? 100), 'payload' => $this->decodeJson((string) ($row['payload'] ?? '')), 'result' => $this->decodeJson((string) ($row['result'] ?? '')), 'attempts' => (int) ($row['attempts'] ?? 0), 'max_attempts' => (int) ($row['max_attempts'] ?? 3), 'scheduled_at' => (string) ($row['scheduled_at'] ?? ''), 'started_at' => (string) ($row['started_at'] ?? ''), 'completed_at' => (string) ($row['completed_at'] ?? ''), 'last_error' => (string) ($row['last_error'] ?? ''), 'created_at' => (string) ($row['created_at'] ?? ''), ]; } private function encodeJson(mixed $value): ?string { if ($value === null) { return null; } if (!is_array($value)) { return null; } if ($value === []) { return null; } return json_encode($value, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES) ?: null; } private function decodeJson(string $value): ?array { $trimmed = trim($value); if ($trimmed === '') { return null; } $decoded = json_decode($trimmed, true); return is_array($decoded) ? $decoded : null; } }