feat: Implement Allegro Order Sync and Status Management
- Added AllegroOrderSyncStateRepository for managing sync state with Allegro orders. - Introduced AllegroOrdersSyncService to handle the synchronization of orders from Allegro. - Created AllegroStatusDiscoveryService to discover and store order statuses from Allegro. - Developed AllegroStatusMappingRepository for managing status mappings between Allegro and OrderPro. - Implemented AllegroStatusSyncService to facilitate status synchronization. - Added CronSettingsController for managing cron job settings related to Allegro integration.
This commit is contained in:
26
src/Modules/Cron/AllegroOrdersImportHandler.php
Normal file
26
src/Modules/Cron/AllegroOrdersImportHandler.php
Normal file
@@ -0,0 +1,26 @@
|
||||
<?php
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Modules\Cron;
|
||||
|
||||
use App\Modules\Settings\AllegroOrdersSyncService;
|
||||
|
||||
final class AllegroOrdersImportHandler
|
||||
{
|
||||
public function __construct(private readonly AllegroOrdersSyncService $syncService)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<string, mixed> $payload
|
||||
* @return array<string, mixed>
|
||||
*/
|
||||
public function handle(array $payload): array
|
||||
{
|
||||
return $this->syncService->sync([
|
||||
'max_pages' => (int) ($payload['max_pages'] ?? 5),
|
||||
'page_limit' => (int) ($payload['page_limit'] ?? 50),
|
||||
'max_orders' => (int) ($payload['max_orders'] ?? 200),
|
||||
]);
|
||||
}
|
||||
}
|
||||
22
src/Modules/Cron/AllegroStatusSyncHandler.php
Normal file
22
src/Modules/Cron/AllegroStatusSyncHandler.php
Normal file
@@ -0,0 +1,22 @@
|
||||
<?php
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Modules\Cron;
|
||||
|
||||
use App\Modules\Settings\AllegroStatusSyncService;
|
||||
|
||||
final class AllegroStatusSyncHandler
|
||||
{
|
||||
public function __construct(private readonly AllegroStatusSyncService $syncService)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<string, mixed> $payload
|
||||
* @return array<string, mixed>
|
||||
*/
|
||||
public function handle(array $payload): array
|
||||
{
|
||||
return $this->syncService->sync();
|
||||
}
|
||||
}
|
||||
64
src/Modules/Cron/AllegroTokenRefreshHandler.php
Normal file
64
src/Modules/Cron/AllegroTokenRefreshHandler.php
Normal file
@@ -0,0 +1,64 @@
|
||||
<?php
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Modules\Cron;
|
||||
|
||||
use App\Modules\Settings\AllegroIntegrationRepository;
|
||||
use App\Modules\Settings\AllegroOAuthClient;
|
||||
use DateInterval;
|
||||
use DateTimeImmutable;
|
||||
use RuntimeException;
|
||||
|
||||
final class AllegroTokenRefreshHandler
|
||||
{
|
||||
public function __construct(
|
||||
private readonly AllegroIntegrationRepository $repository,
|
||||
private readonly AllegroOAuthClient $oauthClient
|
||||
) {
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<string, mixed> $payload
|
||||
* @return array<string, mixed>
|
||||
*/
|
||||
public function handle(array $payload): array
|
||||
{
|
||||
$credentials = $this->repository->getRefreshTokenCredentials();
|
||||
if ($credentials === null) {
|
||||
throw new RuntimeException('Brak kompletnych danych Allegro OAuth do odswiezenia tokenu.');
|
||||
}
|
||||
|
||||
$token = $this->oauthClient->refreshAccessToken(
|
||||
(string) ($credentials['environment'] ?? 'sandbox'),
|
||||
(string) ($credentials['client_id'] ?? ''),
|
||||
(string) ($credentials['client_secret'] ?? ''),
|
||||
(string) ($credentials['refresh_token'] ?? '')
|
||||
);
|
||||
|
||||
$expiresAt = null;
|
||||
$expiresIn = max(0, (int) ($token['expires_in'] ?? 0));
|
||||
if ($expiresIn > 0) {
|
||||
$expiresAt = (new DateTimeImmutable('now'))
|
||||
->add(new DateInterval('PT' . $expiresIn . 'S'))
|
||||
->format('Y-m-d H:i:s');
|
||||
}
|
||||
|
||||
$refreshToken = trim((string) ($token['refresh_token'] ?? ''));
|
||||
if ($refreshToken === '') {
|
||||
$refreshToken = (string) ($credentials['refresh_token'] ?? '');
|
||||
}
|
||||
|
||||
$this->repository->saveTokens(
|
||||
(string) ($token['access_token'] ?? ''),
|
||||
$refreshToken,
|
||||
(string) ($token['token_type'] ?? ''),
|
||||
(string) ($token['scope'] ?? ''),
|
||||
$expiresAt
|
||||
);
|
||||
|
||||
return [
|
||||
'ok' => true,
|
||||
'expires_at' => $expiresAt,
|
||||
];
|
||||
}
|
||||
}
|
||||
448
src/Modules/Cron/CronRepository.php
Normal file
448
src/Modules/Cron/CronRepository.php
Normal file
@@ -0,0 +1,448 @@
|
||||
<?php
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Modules\Cron;
|
||||
|
||||
use DateTimeImmutable;
|
||||
use PDO;
|
||||
use Throwable;
|
||||
|
||||
final class CronRepository
|
||||
{
|
||||
public function __construct(private readonly PDO $pdo)
|
||||
{
|
||||
}
|
||||
|
||||
public function getBoolSetting(string $key, bool $default): bool
|
||||
{
|
||||
$value = $this->getSettingValue($key);
|
||||
if ($value === null) {
|
||||
return $default;
|
||||
}
|
||||
|
||||
return in_array(strtolower($value), ['1', 'true', 'yes', 'on'], true);
|
||||
}
|
||||
|
||||
public function getIntSetting(string $key, int $default, int $min, int $max): int
|
||||
{
|
||||
$value = $this->getSettingValue($key);
|
||||
if ($value === null || !is_numeric($value)) {
|
||||
return max($min, min($max, $default));
|
||||
}
|
||||
|
||||
return max($min, min($max, (int) $value));
|
||||
}
|
||||
|
||||
public function getStringSetting(string $key, string $default): string
|
||||
{
|
||||
$value = $this->getSettingValue($key);
|
||||
if ($value === null) {
|
||||
return $default;
|
||||
}
|
||||
|
||||
return $value;
|
||||
}
|
||||
|
||||
public function upsertSetting(string $key, string $value): void
|
||||
{
|
||||
$statement = $this->pdo->prepare(
|
||||
'INSERT INTO app_settings (setting_key, setting_value, created_at, updated_at)
|
||||
VALUES (:setting_key, :setting_value, NOW(), NOW())
|
||||
ON DUPLICATE KEY UPDATE
|
||||
setting_value = VALUES(setting_value),
|
||||
updated_at = VALUES(updated_at)'
|
||||
);
|
||||
$statement->execute([
|
||||
'setting_key' => $key,
|
||||
'setting_value' => $value,
|
||||
]);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array<int, array<string, mixed>>
|
||||
*/
|
||||
public function listSchedules(): array
|
||||
{
|
||||
$statement = $this->pdo->query(
|
||||
'SELECT id, job_type, interval_seconds, priority, max_attempts, payload, enabled, last_run_at, next_run_at
|
||||
FROM cron_schedules
|
||||
ORDER BY priority ASC, job_type ASC'
|
||||
);
|
||||
$rows = $statement->fetchAll(PDO::FETCH_ASSOC);
|
||||
if (!is_array($rows)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return array_map(fn (array $row): array => $this->normalizeScheduleRow($row), $rows);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array<int, array<string, mixed>>
|
||||
*/
|
||||
public function listFutureJobs(int $limit = 50): array
|
||||
{
|
||||
$safeLimit = max(1, min(200, $limit));
|
||||
$statement = $this->pdo->prepare(
|
||||
'SELECT id, job_type, status, priority, attempts, max_attempts, scheduled_at, last_error, created_at
|
||||
FROM cron_jobs
|
||||
WHERE status IN ("pending", "processing")
|
||||
ORDER BY scheduled_at ASC, priority ASC, id ASC
|
||||
LIMIT :limit'
|
||||
);
|
||||
$statement->bindValue(':limit', $safeLimit, PDO::PARAM_INT);
|
||||
$statement->execute();
|
||||
$rows = $statement->fetchAll(PDO::FETCH_ASSOC);
|
||||
if (!is_array($rows)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return array_map(fn (array $row): array => $this->normalizeJobRow($row), $rows);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array<int, array<string, mixed>>
|
||||
*/
|
||||
public function listPastJobs(int $limit = 50): array
|
||||
{
|
||||
$safeLimit = max(1, min(200, $limit));
|
||||
$statement = $this->pdo->prepare(
|
||||
'SELECT id, job_type, status, priority, attempts, max_attempts, scheduled_at, started_at, completed_at, last_error, created_at
|
||||
FROM cron_jobs
|
||||
WHERE status IN ("completed", "failed", "cancelled")
|
||||
ORDER BY completed_at DESC, id DESC
|
||||
LIMIT :limit'
|
||||
);
|
||||
$statement->bindValue(':limit', $safeLimit, PDO::PARAM_INT);
|
||||
$statement->execute();
|
||||
$rows = $statement->fetchAll(PDO::FETCH_ASSOC);
|
||||
if (!is_array($rows)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return array_map(fn (array $row): array => $this->normalizeJobRow($row), $rows);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array<int, array<string, mixed>>
|
||||
*/
|
||||
public function findDueSchedules(DateTimeImmutable $now): array
|
||||
{
|
||||
$statement = $this->pdo->prepare(
|
||||
'SELECT id, job_type, interval_seconds, priority, max_attempts, payload, enabled, last_run_at, next_run_at
|
||||
FROM cron_schedules
|
||||
WHERE enabled = 1
|
||||
AND (next_run_at IS NULL OR next_run_at <= :now)
|
||||
ORDER BY priority ASC, next_run_at ASC, id ASC'
|
||||
);
|
||||
$statement->execute([
|
||||
'now' => $now->format('Y-m-d H:i:s'),
|
||||
]);
|
||||
$rows = $statement->fetchAll(PDO::FETCH_ASSOC);
|
||||
if (!is_array($rows)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return array_map(fn (array $row): array => $this->normalizeScheduleRow($row), $rows);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<string, mixed> $schedule
|
||||
*/
|
||||
public function enqueueJobFromSchedule(array $schedule, DateTimeImmutable $now): void
|
||||
{
|
||||
$payloadJson = $this->encodeJson($schedule['payload'] ?? null);
|
||||
|
||||
$jobStatement = $this->pdo->prepare(
|
||||
'INSERT INTO cron_jobs (
|
||||
job_type, status, priority, payload, attempts, max_attempts,
|
||||
scheduled_at, created_at, updated_at
|
||||
) VALUES (
|
||||
:job_type, "pending", :priority, :payload, 0, :max_attempts,
|
||||
:scheduled_at, NOW(), NOW()
|
||||
)'
|
||||
);
|
||||
$jobStatement->execute([
|
||||
'job_type' => (string) ($schedule['job_type'] ?? ''),
|
||||
'priority' => (int) ($schedule['priority'] ?? 100),
|
||||
'payload' => $payloadJson,
|
||||
'max_attempts' => max(1, (int) ($schedule['max_attempts'] ?? 3)),
|
||||
'scheduled_at' => $now->format('Y-m-d H:i:s'),
|
||||
]);
|
||||
|
||||
$intervalSeconds = max(1, (int) ($schedule['interval_seconds'] ?? 60));
|
||||
$nextRunAt = $now->modify('+' . $intervalSeconds . ' seconds');
|
||||
|
||||
$scheduleStatement = $this->pdo->prepare(
|
||||
'UPDATE cron_schedules
|
||||
SET last_run_at = :last_run_at,
|
||||
next_run_at = :next_run_at,
|
||||
updated_at = NOW()
|
||||
WHERE id = :id'
|
||||
);
|
||||
$scheduleStatement->execute([
|
||||
'last_run_at' => $now->format('Y-m-d H:i:s'),
|
||||
'next_run_at' => $nextRunAt->format('Y-m-d H:i:s'),
|
||||
'id' => (int) ($schedule['id'] ?? 0),
|
||||
]);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array<string, mixed>|null
|
||||
*/
|
||||
public function claimNextPendingJob(DateTimeImmutable $now): ?array
|
||||
{
|
||||
$selectStatement = $this->pdo->prepare(
|
||||
'SELECT id, job_type, status, priority, payload, attempts, max_attempts, scheduled_at
|
||||
FROM cron_jobs
|
||||
WHERE status = "pending"
|
||||
AND scheduled_at <= :now
|
||||
ORDER BY priority ASC, scheduled_at ASC, id ASC
|
||||
LIMIT 1'
|
||||
);
|
||||
$selectStatement->execute([
|
||||
'now' => $now->format('Y-m-d H:i:s'),
|
||||
]);
|
||||
$row = $selectStatement->fetch(PDO::FETCH_ASSOC);
|
||||
if (!is_array($row)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
$jobId = (int) ($row['id'] ?? 0);
|
||||
if ($jobId <= 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
$updateStatement = $this->pdo->prepare(
|
||||
'UPDATE cron_jobs
|
||||
SET status = "processing",
|
||||
started_at = :started_at,
|
||||
attempts = attempts + 1,
|
||||
updated_at = NOW()
|
||||
WHERE id = :id
|
||||
AND status = "pending"'
|
||||
);
|
||||
$updateStatement->execute([
|
||||
'started_at' => $now->format('Y-m-d H:i:s'),
|
||||
'id' => $jobId,
|
||||
]);
|
||||
|
||||
if ($updateStatement->rowCount() !== 1) {
|
||||
return null;
|
||||
}
|
||||
|
||||
$refreshStatement = $this->pdo->prepare(
|
||||
'SELECT id, job_type, status, priority, payload, attempts, max_attempts, scheduled_at, started_at
|
||||
FROM cron_jobs
|
||||
WHERE id = :id
|
||||
LIMIT 1'
|
||||
);
|
||||
$refreshStatement->execute(['id' => $jobId]);
|
||||
$claimed = $refreshStatement->fetch(PDO::FETCH_ASSOC);
|
||||
if (!is_array($claimed)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return $this->normalizeJobRow($claimed);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<string, mixed>|null $result
|
||||
*/
|
||||
public function markJobCompleted(int $jobId, ?array $result = null): void
|
||||
{
|
||||
$statement = $this->pdo->prepare(
|
||||
'UPDATE cron_jobs
|
||||
SET status = "completed",
|
||||
result = :result,
|
||||
completed_at = NOW(),
|
||||
last_error = NULL,
|
||||
updated_at = NOW()
|
||||
WHERE id = :id'
|
||||
);
|
||||
$statement->execute([
|
||||
'result' => $this->encodeJson($result),
|
||||
'id' => $jobId,
|
||||
]);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<string, mixed>|null $result
|
||||
*/
|
||||
public function markJobFailed(int $jobId, string $error, DateTimeImmutable $now, int $retryDelaySeconds = 60, ?array $result = null): void
|
||||
{
|
||||
$statement = $this->pdo->prepare(
|
||||
'SELECT attempts, max_attempts
|
||||
FROM cron_jobs
|
||||
WHERE id = :id
|
||||
LIMIT 1'
|
||||
);
|
||||
$statement->execute(['id' => $jobId]);
|
||||
$row = $statement->fetch(PDO::FETCH_ASSOC);
|
||||
if (!is_array($row)) {
|
||||
return;
|
||||
}
|
||||
|
||||
$attempts = (int) ($row['attempts'] ?? 0);
|
||||
$maxAttempts = max(1, (int) ($row['max_attempts'] ?? 1));
|
||||
$errorMessage = mb_substr(trim($error), 0, 500);
|
||||
|
||||
if ($attempts < $maxAttempts) {
|
||||
$scheduledAt = $now->modify('+' . max(10, $retryDelaySeconds) . ' seconds');
|
||||
$retryStatement = $this->pdo->prepare(
|
||||
'UPDATE cron_jobs
|
||||
SET status = "pending",
|
||||
result = :result,
|
||||
scheduled_at = :scheduled_at,
|
||||
started_at = NULL,
|
||||
completed_at = NULL,
|
||||
last_error = :last_error,
|
||||
updated_at = NOW()
|
||||
WHERE id = :id'
|
||||
);
|
||||
$retryStatement->execute([
|
||||
'result' => $this->encodeJson($result),
|
||||
'scheduled_at' => $scheduledAt->format('Y-m-d H:i:s'),
|
||||
'last_error' => $errorMessage,
|
||||
'id' => $jobId,
|
||||
]);
|
||||
return;
|
||||
}
|
||||
|
||||
$failStatement = $this->pdo->prepare(
|
||||
'UPDATE cron_jobs
|
||||
SET status = "failed",
|
||||
result = :result,
|
||||
completed_at = NOW(),
|
||||
last_error = :last_error,
|
||||
updated_at = NOW()
|
||||
WHERE id = :id'
|
||||
);
|
||||
$failStatement->execute([
|
||||
'result' => $this->encodeJson($result),
|
||||
'last_error' => $errorMessage,
|
||||
'id' => $jobId,
|
||||
]);
|
||||
}
|
||||
|
||||
public function upsertSchedule(
|
||||
string $jobType,
|
||||
int $intervalSeconds,
|
||||
int $priority,
|
||||
int $maxAttempts,
|
||||
?array $payload,
|
||||
bool $enabled
|
||||
): void {
|
||||
$statement = $this->pdo->prepare(
|
||||
'INSERT INTO cron_schedules (
|
||||
job_type, interval_seconds, priority, max_attempts, payload, enabled, last_run_at, next_run_at, created_at, updated_at
|
||||
) VALUES (
|
||||
:job_type, :interval_seconds, :priority, :max_attempts, :payload, :enabled, NULL, NOW(), NOW(), NOW()
|
||||
)
|
||||
ON DUPLICATE KEY UPDATE
|
||||
interval_seconds = VALUES(interval_seconds),
|
||||
priority = VALUES(priority),
|
||||
max_attempts = VALUES(max_attempts),
|
||||
payload = VALUES(payload),
|
||||
enabled = VALUES(enabled),
|
||||
updated_at = VALUES(updated_at)'
|
||||
);
|
||||
$statement->execute([
|
||||
'job_type' => trim($jobType),
|
||||
'interval_seconds' => max(1, $intervalSeconds),
|
||||
'priority' => max(1, min(255, $priority)),
|
||||
'max_attempts' => max(1, min(20, $maxAttempts)),
|
||||
'payload' => $this->encodeJson($payload),
|
||||
'enabled' => $enabled ? 1 : 0,
|
||||
]);
|
||||
}
|
||||
|
||||
private function getSettingValue(string $key): ?string
|
||||
{
|
||||
try {
|
||||
$statement = $this->pdo->prepare(
|
||||
'SELECT setting_value
|
||||
FROM app_settings
|
||||
WHERE setting_key = :setting_key
|
||||
LIMIT 1'
|
||||
);
|
||||
$statement->execute(['setting_key' => $key]);
|
||||
$value = $statement->fetchColumn();
|
||||
} catch (Throwable) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (!is_string($value)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return trim($value);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<string, mixed> $row
|
||||
* @return array<string, mixed>
|
||||
*/
|
||||
private function normalizeScheduleRow(array $row): array
|
||||
{
|
||||
return [
|
||||
'id' => (int) ($row['id'] ?? 0),
|
||||
'job_type' => (string) ($row['job_type'] ?? ''),
|
||||
'interval_seconds' => (int) ($row['interval_seconds'] ?? 0),
|
||||
'priority' => (int) ($row['priority'] ?? 100),
|
||||
'max_attempts' => (int) ($row['max_attempts'] ?? 3),
|
||||
'payload' => $this->decodeJson((string) ($row['payload'] ?? '')),
|
||||
'enabled' => (int) ($row['enabled'] ?? 0) === 1,
|
||||
'last_run_at' => (string) ($row['last_run_at'] ?? ''),
|
||||
'next_run_at' => (string) ($row['next_run_at'] ?? ''),
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<string, mixed> $row
|
||||
* @return array<string, mixed>
|
||||
*/
|
||||
private function normalizeJobRow(array $row): array
|
||||
{
|
||||
return [
|
||||
'id' => (int) ($row['id'] ?? 0),
|
||||
'job_type' => (string) ($row['job_type'] ?? ''),
|
||||
'status' => (string) ($row['status'] ?? ''),
|
||||
'priority' => (int) ($row['priority'] ?? 100),
|
||||
'payload' => $this->decodeJson((string) ($row['payload'] ?? '')),
|
||||
'result' => $this->decodeJson((string) ($row['result'] ?? '')),
|
||||
'attempts' => (int) ($row['attempts'] ?? 0),
|
||||
'max_attempts' => (int) ($row['max_attempts'] ?? 3),
|
||||
'scheduled_at' => (string) ($row['scheduled_at'] ?? ''),
|
||||
'started_at' => (string) ($row['started_at'] ?? ''),
|
||||
'completed_at' => (string) ($row['completed_at'] ?? ''),
|
||||
'last_error' => (string) ($row['last_error'] ?? ''),
|
||||
'created_at' => (string) ($row['created_at'] ?? ''),
|
||||
];
|
||||
}
|
||||
|
||||
private function encodeJson(mixed $value): ?string
|
||||
{
|
||||
if ($value === null) {
|
||||
return null;
|
||||
}
|
||||
if (!is_array($value)) {
|
||||
return null;
|
||||
}
|
||||
if ($value === []) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return json_encode($value, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES) ?: null;
|
||||
}
|
||||
|
||||
private function decodeJson(string $value): ?array
|
||||
{
|
||||
$trimmed = trim($value);
|
||||
if ($trimmed === '') {
|
||||
return null;
|
||||
}
|
||||
|
||||
$decoded = json_decode($trimmed, true);
|
||||
return is_array($decoded) ? $decoded : null;
|
||||
}
|
||||
}
|
||||
99
src/Modules/Cron/CronRunner.php
Normal file
99
src/Modules/Cron/CronRunner.php
Normal file
@@ -0,0 +1,99 @@
|
||||
<?php
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Modules\Cron;
|
||||
|
||||
use App\Core\Support\Logger;
|
||||
use DateTimeImmutable;
|
||||
use RuntimeException;
|
||||
use Throwable;
|
||||
|
||||
final class CronRunner
|
||||
{
|
||||
/**
|
||||
* @param array<string, object> $handlers
|
||||
*/
|
||||
public function __construct(
|
||||
private readonly CronRepository $repository,
|
||||
private readonly Logger $logger,
|
||||
private readonly array $handlers
|
||||
) {
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array<string, int>
|
||||
*/
|
||||
public function run(int $limit): array
|
||||
{
|
||||
$safeLimit = max(1, min(100, $limit));
|
||||
$now = new DateTimeImmutable('now');
|
||||
|
||||
$dispatched = $this->dispatchDueSchedules($now);
|
||||
$processed = 0;
|
||||
$completed = 0;
|
||||
$failed = 0;
|
||||
|
||||
while ($processed < $safeLimit) {
|
||||
$job = $this->repository->claimNextPendingJob(new DateTimeImmutable('now'));
|
||||
if ($job === null) {
|
||||
break;
|
||||
}
|
||||
|
||||
$processed++;
|
||||
$jobId = (int) ($job['id'] ?? 0);
|
||||
$jobType = (string) ($job['job_type'] ?? '');
|
||||
|
||||
try {
|
||||
$result = $this->handleJob($jobType, is_array($job['payload'] ?? null) ? $job['payload'] : []);
|
||||
$this->repository->markJobCompleted($jobId, $result);
|
||||
$completed++;
|
||||
} catch (Throwable $exception) {
|
||||
$this->repository->markJobFailed($jobId, $exception->getMessage(), new DateTimeImmutable('now'), 60);
|
||||
$this->logger->error('Cron job failed', [
|
||||
'job_id' => $jobId,
|
||||
'job_type' => $jobType,
|
||||
'error' => $exception->getMessage(),
|
||||
]);
|
||||
$failed++;
|
||||
}
|
||||
}
|
||||
|
||||
return [
|
||||
'dispatched' => $dispatched,
|
||||
'processed' => $processed,
|
||||
'completed' => $completed,
|
||||
'failed' => $failed,
|
||||
];
|
||||
}
|
||||
|
||||
private function dispatchDueSchedules(DateTimeImmutable $now): int
|
||||
{
|
||||
$schedules = $this->repository->findDueSchedules($now);
|
||||
$count = 0;
|
||||
foreach ($schedules as $schedule) {
|
||||
$this->repository->enqueueJobFromSchedule($schedule, $now);
|
||||
$count++;
|
||||
}
|
||||
|
||||
return $count;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<string, mixed> $payload
|
||||
* @return array<string, mixed>
|
||||
*/
|
||||
private function handleJob(string $jobType, array $payload): array
|
||||
{
|
||||
$handler = $this->handlers[$jobType] ?? null;
|
||||
if ($handler === null || !method_exists($handler, 'handle')) {
|
||||
throw new RuntimeException('Brak handlera dla typu joba: ' . $jobType);
|
||||
}
|
||||
|
||||
$result = $handler->handle($payload);
|
||||
if (!is_array($result)) {
|
||||
return ['ok' => true];
|
||||
}
|
||||
|
||||
return $result;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user