201 lines
6.2 KiB
PHP
201 lines
6.2 KiB
PHP
<?php
|
|
declare(strict_types=1);
|
|
|
|
namespace App\Modules\Cron;
|
|
|
|
use RuntimeException;
|
|
use Throwable;
|
|
|
|
final class CronJobProcessor
|
|
{
|
|
/** @var array<string, callable> */
|
|
private array $handlers = [];
|
|
|
|
public function __construct(private readonly CronJobRepository $jobs)
|
|
{
|
|
}
|
|
|
|
public function registerHandler(string $jobType, callable $handler): void
|
|
{
|
|
$normalized = trim($jobType);
|
|
if ($normalized === '') {
|
|
return;
|
|
}
|
|
|
|
$this->handlers[$normalized] = $handler;
|
|
}
|
|
|
|
/**
|
|
* @return array{created:int,skipped:int}
|
|
*/
|
|
public function createScheduledJobs(): array
|
|
{
|
|
$created = 0;
|
|
$skipped = 0;
|
|
$schedules = $this->jobs->getDueSchedules();
|
|
|
|
foreach ($schedules as $schedule) {
|
|
$scheduleId = (int) ($schedule['id'] ?? 0);
|
|
$jobType = trim((string) ($schedule['job_type'] ?? ''));
|
|
$intervalSeconds = max(60, (int) ($schedule['interval_seconds'] ?? 0));
|
|
|
|
if ($scheduleId <= 0 || $jobType === '') {
|
|
continue;
|
|
}
|
|
|
|
$hasPending = $this->jobs->hasPendingJob($jobType);
|
|
if ($hasPending) {
|
|
$skipped++;
|
|
} else {
|
|
$payload = is_array($schedule['payload'] ?? null) ? (array) $schedule['payload'] : null;
|
|
$this->jobs->enqueue(
|
|
$jobType,
|
|
$payload,
|
|
(int) ($schedule['priority'] ?? CronJobType::priorityFor($jobType)),
|
|
(int) ($schedule['max_attempts'] ?? CronJobType::maxAttemptsFor($jobType))
|
|
);
|
|
$created++;
|
|
}
|
|
|
|
$this->jobs->touchSchedule($scheduleId, $intervalSeconds);
|
|
}
|
|
|
|
return [
|
|
'created' => $created,
|
|
'skipped' => $skipped,
|
|
];
|
|
}
|
|
|
|
/**
|
|
* @return array{processed:int,completed:int,retried:int,failed:int}
|
|
*/
|
|
public function processQueue(int $limit = 20): array
|
|
{
|
|
$processed = 0;
|
|
$completed = 0;
|
|
$retried = 0;
|
|
$failed = 0;
|
|
|
|
$jobs = $this->jobs->fetchNext($limit);
|
|
foreach ($jobs as $job) {
|
|
$processed++;
|
|
|
|
$jobId = (int) ($job['id'] ?? 0);
|
|
$jobType = trim((string) ($job['job_type'] ?? ''));
|
|
if ($jobId <= 0 || $jobType === '') {
|
|
continue;
|
|
}
|
|
|
|
$handler = $this->handlers[$jobType] ?? null;
|
|
if (!is_callable($handler)) {
|
|
$defaultBackoff = $this->defaultBackoffSeconds((int) ($job['attempts'] ?? 0));
|
|
$isFinal = $this->jobs->markFailed(
|
|
$jobId,
|
|
'Brak zarejestrowanego handlera dla typu joba: ' . $jobType,
|
|
$defaultBackoff
|
|
);
|
|
if ($isFinal) {
|
|
$failed++;
|
|
} else {
|
|
$retried++;
|
|
}
|
|
continue;
|
|
}
|
|
|
|
try {
|
|
$payload = is_array($job['payload'] ?? null) ? (array) $job['payload'] : [];
|
|
$result = $handler($payload, $job);
|
|
|
|
$ok = true;
|
|
$message = '';
|
|
$retryAfter = 0;
|
|
$resultPayload = [];
|
|
|
|
if (is_bool($result)) {
|
|
$ok = $result;
|
|
} elseif (is_array($result)) {
|
|
$ok = ($result['ok'] ?? true) === true;
|
|
$message = trim((string) ($result['message'] ?? ''));
|
|
$retryAfter = max(0, (int) ($result['retry_after'] ?? 0));
|
|
$resultPayload = $result;
|
|
}
|
|
|
|
if ($ok) {
|
|
$this->jobs->markCompleted($jobId, $resultPayload === [] ? null : $resultPayload);
|
|
$completed++;
|
|
continue;
|
|
}
|
|
|
|
if ($message === '') {
|
|
$message = 'Handler zakonczyl job niepowodzeniem.';
|
|
}
|
|
$backoffSeconds = $retryAfter > 0 ? $retryAfter : $this->defaultBackoffSeconds((int) ($job['attempts'] ?? 0));
|
|
$isFinal = $this->jobs->markFailed($jobId, $message, $backoffSeconds);
|
|
if ($isFinal) {
|
|
$failed++;
|
|
} else {
|
|
$retried++;
|
|
}
|
|
} catch (Throwable $exception) {
|
|
$backoffSeconds = $this->defaultBackoffSeconds((int) ($job['attempts'] ?? 0));
|
|
$isFinal = $this->jobs->markFailed($jobId, $exception->getMessage(), $backoffSeconds);
|
|
if ($isFinal) {
|
|
$failed++;
|
|
} else {
|
|
$retried++;
|
|
}
|
|
}
|
|
}
|
|
|
|
return [
|
|
'processed' => $processed,
|
|
'completed' => $completed,
|
|
'retried' => $retried,
|
|
'failed' => $failed,
|
|
];
|
|
}
|
|
|
|
/**
|
|
* @return array{
|
|
* recovered:int,
|
|
* scheduled_created:int,
|
|
* scheduled_skipped:int,
|
|
* processed:int,
|
|
* completed:int,
|
|
* retried:int,
|
|
* failed:int,
|
|
* cleaned:int
|
|
* }
|
|
*/
|
|
public function run(int $limit = 20): array
|
|
{
|
|
if ($limit <= 0) {
|
|
throw new RuntimeException('Limit przetwarzania cron musi byc wiekszy od 0.');
|
|
}
|
|
|
|
$recovered = $this->jobs->recoverStuck(15);
|
|
$scheduled = $this->createScheduledJobs();
|
|
$processed = $this->processQueue($limit);
|
|
$cleaned = $this->jobs->cleanup(30);
|
|
|
|
return [
|
|
'recovered' => $recovered,
|
|
'scheduled_created' => (int) ($scheduled['created'] ?? 0),
|
|
'scheduled_skipped' => (int) ($scheduled['skipped'] ?? 0),
|
|
'processed' => (int) ($processed['processed'] ?? 0),
|
|
'completed' => (int) ($processed['completed'] ?? 0),
|
|
'retried' => (int) ($processed['retried'] ?? 0),
|
|
'failed' => (int) ($processed['failed'] ?? 0),
|
|
'cleaned' => $cleaned,
|
|
];
|
|
}
|
|
|
|
private function defaultBackoffSeconds(int $attemptsAlreadyDone): int
|
|
{
|
|
$currentAttempt = max(1, $attemptsAlreadyDone + 1);
|
|
$seconds = (int) (60 * (2 ** ($currentAttempt - 1)));
|
|
|
|
return min(3600, max(60, $seconds));
|
|
}
|
|
}
|