- Email notyfikacji zawiera numer zamówienia, klienta, datę, kwotę - Order joby (send_order, sync_payment, sync_status) ponawiane w nieskończoność co 30 min - Rozróżnienie PONAWIANY vs TRWAŁY BŁĄD w emailu - Cleanup stuck jobów po udanym wysłaniu zamówienia - +2 testy infinite retry w CronJobRepositoryTest Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
261 lines
7.4 KiB
PHP
261 lines
7.4 KiB
PHP
<?php
|
|
|
|
namespace Domain\CronJob;
|
|
|
|
class CronJobRepository
|
|
{
|
|
/** @var \medoo */
|
|
private $db;
|
|
|
|
/**
|
|
* @param \medoo $db
|
|
*/
|
|
public function __construct($db)
|
|
{
|
|
$this->db = $db;
|
|
}
|
|
|
|
/**
|
|
* Dodaj zadanie do kolejki
|
|
*
|
|
* @param string $jobType
|
|
* @param array|null $payload
|
|
* @param int $priority
|
|
* @param int $maxAttempts
|
|
* @param string|null $scheduledAt
|
|
* @return int|null ID nowego zadania
|
|
*/
|
|
public function enqueue($jobType, $payload = null, $priority = CronJobType::PRIORITY_NORMAL, $maxAttempts = 10, $scheduledAt = null)
|
|
{
|
|
$data = [
|
|
'job_type' => $jobType,
|
|
'status' => CronJobType::STATUS_PENDING,
|
|
'priority' => $priority,
|
|
'max_attempts' => $maxAttempts,
|
|
'scheduled_at' => $scheduledAt ? $scheduledAt : date('Y-m-d H:i:s'),
|
|
];
|
|
|
|
if ($payload !== null) {
|
|
$data['payload'] = json_encode($payload);
|
|
}
|
|
|
|
$this->db->insert('pp_cron_jobs', $data);
|
|
$id = $this->db->id();
|
|
|
|
return $id ? (int) $id : null;
|
|
}
|
|
|
|
/**
|
|
* Atomowe pobranie następnych zadań do przetworzenia.
|
|
*
|
|
* Uwaga: SELECT + UPDATE nie jest w pełni atomowe bez transakcji.
|
|
* Po UPDATE re-SELECT potwierdza, które joby zostały faktycznie przejęte
|
|
* (chroni przed race condition przy wielu workerach).
|
|
*
|
|
* @param int $limit
|
|
* @return array
|
|
*/
|
|
public function fetchNext($limit = 5)
|
|
{
|
|
$now = date('Y-m-d H:i:s');
|
|
|
|
$jobs = $this->db->select('pp_cron_jobs', '*', [
|
|
'status' => CronJobType::STATUS_PENDING,
|
|
'scheduled_at[<=]' => $now,
|
|
'ORDER' => ['priority' => 'ASC', 'scheduled_at' => 'ASC'],
|
|
'LIMIT' => $limit,
|
|
]);
|
|
|
|
if (empty($jobs)) {
|
|
return [];
|
|
}
|
|
|
|
$ids = array_column($jobs, 'id');
|
|
|
|
$this->db->update('pp_cron_jobs', [
|
|
'status' => CronJobType::STATUS_PROCESSING,
|
|
'started_at' => $now,
|
|
'attempts[+]' => 1,
|
|
], [
|
|
'id' => $ids,
|
|
'status' => CronJobType::STATUS_PENDING,
|
|
]);
|
|
|
|
// Re-SELECT: potwierdź, które joby zostały faktycznie przejęte
|
|
$claimed = $this->db->select('pp_cron_jobs', '*', [
|
|
'id' => $ids,
|
|
'status' => CronJobType::STATUS_PROCESSING,
|
|
'started_at' => $now,
|
|
]);
|
|
|
|
if (empty($claimed)) {
|
|
return [];
|
|
}
|
|
|
|
foreach ($claimed as &$job) {
|
|
if ($job['payload'] !== null) {
|
|
$job['payload'] = json_decode($job['payload'], true);
|
|
}
|
|
}
|
|
|
|
return $claimed;
|
|
}
|
|
|
|
/**
|
|
* Oznacz zadanie jako zakończone
|
|
*
|
|
* @param int $jobId
|
|
* @param mixed $result
|
|
*/
|
|
public function markCompleted($jobId, $result = null)
|
|
{
|
|
$data = [
|
|
'status' => CronJobType::STATUS_COMPLETED,
|
|
'completed_at' => date('Y-m-d H:i:s'),
|
|
];
|
|
|
|
if ($result !== null) {
|
|
$data['result'] = json_encode($result);
|
|
}
|
|
|
|
$this->db->update('pp_cron_jobs', $data, ['id' => $jobId]);
|
|
}
|
|
|
|
/**
|
|
* Oznacz zadanie jako nieudane z backoffem
|
|
*
|
|
* @param int $jobId
|
|
* @param string $error
|
|
* @param int $attempt Numer próby (do obliczenia backoffu)
|
|
*/
|
|
public function markFailed($jobId, $error, $attempt = 1)
|
|
{
|
|
$job = $this->db->get('pp_cron_jobs', ['job_type', 'max_attempts', 'attempts'], ['id' => $jobId]);
|
|
|
|
$attempts = $job ? (int) $job['attempts'] : $attempt;
|
|
$maxAttempts = $job ? (int) $job['max_attempts'] : 10;
|
|
$jobType = $job ? $job['job_type'] : '';
|
|
|
|
// Order-related Apilo joby — infinite retry co 30 min
|
|
if (CronJobType::isOrderRelatedApiloJob($jobType)) {
|
|
$nextRun = date('Y-m-d H:i:s', time() + CronJobType::APILO_ORDER_BACKOFF_SECONDS);
|
|
$this->db->update('pp_cron_jobs', [
|
|
'status' => CronJobType::STATUS_PENDING,
|
|
'last_error' => mb_substr($error, 0, 500),
|
|
'scheduled_at' => $nextRun,
|
|
], ['id' => $jobId]);
|
|
return;
|
|
}
|
|
|
|
if ($attempts >= $maxAttempts) {
|
|
// Przekroczono limit prób — trwale failed
|
|
$this->db->update('pp_cron_jobs', [
|
|
'status' => CronJobType::STATUS_FAILED,
|
|
'last_error' => mb_substr($error, 0, 500),
|
|
'completed_at' => date('Y-m-d H:i:s'),
|
|
], ['id' => $jobId]);
|
|
} else {
|
|
// Wróć do pending z backoffem
|
|
$backoff = CronJobType::calculateBackoff($attempts);
|
|
$nextRun = date('Y-m-d H:i:s', time() + $backoff);
|
|
|
|
$this->db->update('pp_cron_jobs', [
|
|
'status' => CronJobType::STATUS_PENDING,
|
|
'last_error' => mb_substr($error, 0, 500),
|
|
'scheduled_at' => $nextRun,
|
|
], ['id' => $jobId]);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Sprawdź czy istnieje pending job danego typu z opcjonalnym payload match
|
|
*
|
|
* @param string $jobType
|
|
* @param array|null $payloadMatch
|
|
* @return bool
|
|
*/
|
|
public function hasPendingJob($jobType, $payloadMatch = null)
|
|
{
|
|
$where = [
|
|
'job_type' => $jobType,
|
|
'status' => [CronJobType::STATUS_PENDING, CronJobType::STATUS_PROCESSING],
|
|
];
|
|
|
|
if ($payloadMatch !== null) {
|
|
$where['payload'] = json_encode($payloadMatch);
|
|
}
|
|
|
|
$count = $this->db->count('pp_cron_jobs', $where);
|
|
return $count > 0;
|
|
}
|
|
|
|
/**
|
|
* Wyczyść stare zakończone zadania
|
|
*
|
|
* @param int $olderThanDays
|
|
*/
|
|
public function cleanup($olderThanDays = 30)
|
|
{
|
|
$cutoff = date('Y-m-d H:i:s', time() - ($olderThanDays * 86400));
|
|
|
|
$this->db->delete('pp_cron_jobs', [
|
|
'status' => [CronJobType::STATUS_COMPLETED, CronJobType::STATUS_FAILED, CronJobType::STATUS_CANCELLED],
|
|
'updated_at[<]' => $cutoff,
|
|
]);
|
|
}
|
|
|
|
/**
|
|
* Odzyskaj zablokowane zadania (stuck w processing)
|
|
*
|
|
* @param int $olderThanMinutes
|
|
*/
|
|
public function recoverStuck($olderThanMinutes = 30)
|
|
{
|
|
$cutoff = date('Y-m-d H:i:s', time() - ($olderThanMinutes * 60));
|
|
|
|
$this->db->update('pp_cron_jobs', [
|
|
'status' => CronJobType::STATUS_PENDING,
|
|
'started_at' => null,
|
|
], [
|
|
'status' => CronJobType::STATUS_PROCESSING,
|
|
'started_at[<]' => $cutoff,
|
|
]);
|
|
}
|
|
|
|
/**
|
|
* Pobierz harmonogramy gotowe do uruchomienia
|
|
*
|
|
* @return array
|
|
*/
|
|
public function getDueSchedules()
|
|
{
|
|
$now = date('Y-m-d H:i:s');
|
|
|
|
return $this->db->select('pp_cron_schedules', '*', [
|
|
'enabled' => 1,
|
|
'OR' => [
|
|
'next_run_at' => null,
|
|
'next_run_at[<=]' => $now,
|
|
],
|
|
'ORDER' => ['priority' => 'ASC'],
|
|
]);
|
|
}
|
|
|
|
/**
|
|
* Aktualizuj harmonogram po uruchomieniu
|
|
*
|
|
* @param int $scheduleId
|
|
* @param int $intervalSeconds
|
|
*/
|
|
public function touchSchedule($scheduleId, $intervalSeconds)
|
|
{
|
|
$now = date('Y-m-d H:i:s');
|
|
$nextRun = date('Y-m-d H:i:s', time() + $intervalSeconds);
|
|
|
|
$this->db->update('pp_cron_schedules', [
|
|
'last_run_at' => $now,
|
|
'next_run_at' => $nextRun,
|
|
], ['id' => $scheduleId]);
|
|
}
|
|
}
|