Add Orders and Order Status repositories with pagination and management features
- Implemented OrdersRepository for handling order data with pagination, filtering, and sorting capabilities. - Added methods for retrieving order status options, quick stats, and detailed order information. - Created OrderStatusRepository for managing order status groups and statuses, including CRUD operations and sorting. - Introduced a bootstrap file for test environment setup and autoloading.
This commit is contained in:
@@ -1,200 +0,0 @@
|
||||
<?php
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Modules\Cron;
|
||||
|
||||
use RuntimeException;
|
||||
use Throwable;
|
||||
|
||||
final class CronJobProcessor
|
||||
{
|
||||
/** @var array<string, callable> */
|
||||
private array $handlers = [];
|
||||
|
||||
public function __construct(private readonly CronJobRepository $jobs)
|
||||
{
|
||||
}
|
||||
|
||||
public function registerHandler(string $jobType, callable $handler): void
|
||||
{
|
||||
$normalized = trim($jobType);
|
||||
if ($normalized === '') {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->handlers[$normalized] = $handler;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array{created:int,skipped:int}
|
||||
*/
|
||||
public function createScheduledJobs(): array
|
||||
{
|
||||
$created = 0;
|
||||
$skipped = 0;
|
||||
$schedules = $this->jobs->getDueSchedules();
|
||||
|
||||
foreach ($schedules as $schedule) {
|
||||
$scheduleId = (int) ($schedule['id'] ?? 0);
|
||||
$jobType = trim((string) ($schedule['job_type'] ?? ''));
|
||||
$intervalSeconds = max(60, (int) ($schedule['interval_seconds'] ?? 0));
|
||||
|
||||
if ($scheduleId <= 0 || $jobType === '') {
|
||||
continue;
|
||||
}
|
||||
|
||||
$hasPending = $this->jobs->hasPendingJob($jobType);
|
||||
if ($hasPending) {
|
||||
$skipped++;
|
||||
} else {
|
||||
$payload = is_array($schedule['payload'] ?? null) ? (array) $schedule['payload'] : null;
|
||||
$this->jobs->enqueue(
|
||||
$jobType,
|
||||
$payload,
|
||||
(int) ($schedule['priority'] ?? CronJobType::priorityFor($jobType)),
|
||||
(int) ($schedule['max_attempts'] ?? CronJobType::maxAttemptsFor($jobType))
|
||||
);
|
||||
$created++;
|
||||
}
|
||||
|
||||
$this->jobs->touchSchedule($scheduleId, $intervalSeconds);
|
||||
}
|
||||
|
||||
return [
|
||||
'created' => $created,
|
||||
'skipped' => $skipped,
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array{processed:int,completed:int,retried:int,failed:int}
|
||||
*/
|
||||
public function processQueue(int $limit = 20): array
|
||||
{
|
||||
$processed = 0;
|
||||
$completed = 0;
|
||||
$retried = 0;
|
||||
$failed = 0;
|
||||
|
||||
$jobs = $this->jobs->fetchNext($limit);
|
||||
foreach ($jobs as $job) {
|
||||
$processed++;
|
||||
|
||||
$jobId = (int) ($job['id'] ?? 0);
|
||||
$jobType = trim((string) ($job['job_type'] ?? ''));
|
||||
if ($jobId <= 0 || $jobType === '') {
|
||||
continue;
|
||||
}
|
||||
|
||||
$handler = $this->handlers[$jobType] ?? null;
|
||||
if (!is_callable($handler)) {
|
||||
$defaultBackoff = $this->defaultBackoffSeconds((int) ($job['attempts'] ?? 0));
|
||||
$isFinal = $this->jobs->markFailed(
|
||||
$jobId,
|
||||
'Brak zarejestrowanego handlera dla typu joba: ' . $jobType,
|
||||
$defaultBackoff
|
||||
);
|
||||
if ($isFinal) {
|
||||
$failed++;
|
||||
} else {
|
||||
$retried++;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
$payload = is_array($job['payload'] ?? null) ? (array) $job['payload'] : [];
|
||||
$result = $handler($payload, $job);
|
||||
|
||||
$ok = true;
|
||||
$message = '';
|
||||
$retryAfter = 0;
|
||||
$resultPayload = [];
|
||||
|
||||
if (is_bool($result)) {
|
||||
$ok = $result;
|
||||
} elseif (is_array($result)) {
|
||||
$ok = ($result['ok'] ?? true) === true;
|
||||
$message = trim((string) ($result['message'] ?? ''));
|
||||
$retryAfter = max(0, (int) ($result['retry_after'] ?? 0));
|
||||
$resultPayload = $result;
|
||||
}
|
||||
|
||||
if ($ok) {
|
||||
$this->jobs->markCompleted($jobId, $resultPayload === [] ? null : $resultPayload);
|
||||
$completed++;
|
||||
continue;
|
||||
}
|
||||
|
||||
if ($message === '') {
|
||||
$message = 'Handler zakonczyl job niepowodzeniem.';
|
||||
}
|
||||
$backoffSeconds = $retryAfter > 0 ? $retryAfter : $this->defaultBackoffSeconds((int) ($job['attempts'] ?? 0));
|
||||
$isFinal = $this->jobs->markFailed($jobId, $message, $backoffSeconds);
|
||||
if ($isFinal) {
|
||||
$failed++;
|
||||
} else {
|
||||
$retried++;
|
||||
}
|
||||
} catch (Throwable $exception) {
|
||||
$backoffSeconds = $this->defaultBackoffSeconds((int) ($job['attempts'] ?? 0));
|
||||
$isFinal = $this->jobs->markFailed($jobId, $exception->getMessage(), $backoffSeconds);
|
||||
if ($isFinal) {
|
||||
$failed++;
|
||||
} else {
|
||||
$retried++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return [
|
||||
'processed' => $processed,
|
||||
'completed' => $completed,
|
||||
'retried' => $retried,
|
||||
'failed' => $failed,
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array{
|
||||
* recovered:int,
|
||||
* scheduled_created:int,
|
||||
* scheduled_skipped:int,
|
||||
* processed:int,
|
||||
* completed:int,
|
||||
* retried:int,
|
||||
* failed:int,
|
||||
* cleaned:int
|
||||
* }
|
||||
*/
|
||||
public function run(int $limit = 20): array
|
||||
{
|
||||
if ($limit <= 0) {
|
||||
throw new RuntimeException('Limit przetwarzania cron musi byc wiekszy od 0.');
|
||||
}
|
||||
|
||||
$recovered = $this->jobs->recoverStuck(15);
|
||||
$scheduled = $this->createScheduledJobs();
|
||||
$processed = $this->processQueue($limit);
|
||||
$cleaned = $this->jobs->cleanup(30);
|
||||
|
||||
return [
|
||||
'recovered' => $recovered,
|
||||
'scheduled_created' => (int) ($scheduled['created'] ?? 0),
|
||||
'scheduled_skipped' => (int) ($scheduled['skipped'] ?? 0),
|
||||
'processed' => (int) ($processed['processed'] ?? 0),
|
||||
'completed' => (int) ($processed['completed'] ?? 0),
|
||||
'retried' => (int) ($processed['retried'] ?? 0),
|
||||
'failed' => (int) ($processed['failed'] ?? 0),
|
||||
'cleaned' => $cleaned,
|
||||
];
|
||||
}
|
||||
|
||||
private function defaultBackoffSeconds(int $attemptsAlreadyDone): int
|
||||
{
|
||||
$currentAttempt = max(1, $attemptsAlreadyDone + 1);
|
||||
$seconds = (int) (60 * (2 ** ($currentAttempt - 1)));
|
||||
|
||||
return min(3600, max(60, $seconds));
|
||||
}
|
||||
}
|
||||
@@ -1,517 +0,0 @@
|
||||
<?php
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Modules\Cron;
|
||||
|
||||
use DateTimeImmutable;
|
||||
use PDO;
|
||||
use Throwable;
|
||||
|
||||
final class CronJobRepository
|
||||
{
|
||||
public function __construct(private readonly PDO $pdo)
|
||||
{
|
||||
}
|
||||
|
||||
public function enqueue(
|
||||
string $jobType,
|
||||
?array $payload = null,
|
||||
?int $priority = null,
|
||||
?int $maxAttempts = null,
|
||||
?string $scheduledAt = null
|
||||
): int {
|
||||
$statement = $this->pdo->prepare(
|
||||
'INSERT INTO cron_jobs (
|
||||
job_type, status, priority, payload, attempts, max_attempts,
|
||||
scheduled_at, created_at, updated_at
|
||||
) VALUES (
|
||||
:job_type, :status, :priority, :payload, :attempts, :max_attempts,
|
||||
:scheduled_at, :created_at, :updated_at
|
||||
)'
|
||||
);
|
||||
|
||||
$now = date('Y-m-d H:i:s');
|
||||
$scheduled = $scheduledAt !== null && trim($scheduledAt) !== ''
|
||||
? trim($scheduledAt)
|
||||
: $now;
|
||||
$resolvedPriority = $priority !== null && $priority >= 0
|
||||
? min(255, $priority)
|
||||
: CronJobType::priorityFor($jobType);
|
||||
$resolvedMaxAttempts = $maxAttempts !== null && $maxAttempts > 0
|
||||
? min(999, $maxAttempts)
|
||||
: CronJobType::maxAttemptsFor($jobType);
|
||||
|
||||
$statement->execute([
|
||||
'job_type' => trim($jobType),
|
||||
'status' => 'pending',
|
||||
'priority' => $resolvedPriority,
|
||||
'payload' => $this->encodeJson($payload),
|
||||
'attempts' => 0,
|
||||
'max_attempts' => $resolvedMaxAttempts,
|
||||
'scheduled_at' => $scheduled,
|
||||
'created_at' => $now,
|
||||
'updated_at' => $now,
|
||||
]);
|
||||
|
||||
return (int) $this->pdo->lastInsertId();
|
||||
}
|
||||
|
||||
public function hasPendingJob(string $jobType, ?array $payload = null): bool
|
||||
{
|
||||
$sql = 'SELECT 1
|
||||
FROM cron_jobs
|
||||
WHERE job_type = :job_type
|
||||
AND status IN (\'pending\', \'processing\')';
|
||||
$params = [
|
||||
'job_type' => trim($jobType),
|
||||
];
|
||||
|
||||
if ($payload !== null) {
|
||||
$sql .= ' AND payload = :payload';
|
||||
$params['payload'] = $this->encodeJson($payload);
|
||||
}
|
||||
|
||||
$sql .= ' LIMIT 1';
|
||||
|
||||
$statement = $this->pdo->prepare($sql);
|
||||
$statement->execute($params);
|
||||
|
||||
return $statement->fetchColumn() !== false;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array<int, array<string, mixed>>
|
||||
*/
|
||||
public function fetchNext(int $limit = 1): array
|
||||
{
|
||||
$safeLimit = max(1, min(100, $limit));
|
||||
$now = date('Y-m-d H:i:s');
|
||||
|
||||
$this->pdo->beginTransaction();
|
||||
|
||||
try {
|
||||
$select = $this->pdo->prepare(
|
||||
'SELECT id, job_type, status, priority, payload, result, attempts, max_attempts,
|
||||
last_error, scheduled_at, started_at, completed_at, created_at, updated_at
|
||||
FROM cron_jobs
|
||||
WHERE status = :status
|
||||
AND scheduled_at <= :scheduled_at
|
||||
ORDER BY priority ASC, scheduled_at ASC, id ASC
|
||||
LIMIT :limit
|
||||
FOR UPDATE'
|
||||
);
|
||||
$select->bindValue(':status', 'pending');
|
||||
$select->bindValue(':scheduled_at', $now);
|
||||
$select->bindValue(':limit', $safeLimit, PDO::PARAM_INT);
|
||||
$select->execute();
|
||||
|
||||
$rows = $select->fetchAll();
|
||||
if (!is_array($rows) || $rows === []) {
|
||||
if ($this->pdo->inTransaction()) {
|
||||
$this->pdo->commit();
|
||||
}
|
||||
|
||||
return [];
|
||||
}
|
||||
|
||||
$ids = array_values(array_map(
|
||||
static fn (array $row): int => (int) ($row['id'] ?? 0),
|
||||
array_filter($rows, static fn (mixed $row): bool => is_array($row))
|
||||
));
|
||||
$ids = array_values(array_filter($ids, static fn (int $id): bool => $id > 0));
|
||||
|
||||
if ($ids === []) {
|
||||
if ($this->pdo->inTransaction()) {
|
||||
$this->pdo->commit();
|
||||
}
|
||||
|
||||
return [];
|
||||
}
|
||||
|
||||
$placeholders = implode(', ', array_fill(0, count($ids), '?'));
|
||||
$update = $this->pdo->prepare(
|
||||
'UPDATE cron_jobs SET
|
||||
status = ?,
|
||||
started_at = ?,
|
||||
updated_at = ?
|
||||
WHERE id IN (' . $placeholders . ')'
|
||||
);
|
||||
$update->execute(array_merge(['processing', $now, $now], $ids));
|
||||
|
||||
if ($this->pdo->inTransaction()) {
|
||||
$this->pdo->commit();
|
||||
}
|
||||
|
||||
return array_map([$this, 'mapJobRow'], $rows);
|
||||
} catch (Throwable $exception) {
|
||||
if ($this->pdo->inTransaction()) {
|
||||
$this->pdo->rollBack();
|
||||
}
|
||||
|
||||
throw $exception;
|
||||
}
|
||||
}
|
||||
|
||||
public function markCompleted(int $jobId, ?array $result = null): void
|
||||
{
|
||||
$statement = $this->pdo->prepare(
|
||||
'UPDATE cron_jobs SET
|
||||
status = :status,
|
||||
attempts = attempts + 1,
|
||||
result = :result,
|
||||
last_error = NULL,
|
||||
completed_at = :completed_at,
|
||||
updated_at = :updated_at
|
||||
WHERE id = :id'
|
||||
);
|
||||
|
||||
$now = date('Y-m-d H:i:s');
|
||||
$statement->execute([
|
||||
'id' => $jobId,
|
||||
'status' => 'completed',
|
||||
'result' => $this->encodeJson($result),
|
||||
'completed_at' => $now,
|
||||
'updated_at' => $now,
|
||||
]);
|
||||
}
|
||||
|
||||
public function markFailed(int $jobId, string $errorMessage, int $backoffSeconds = 60): bool
|
||||
{
|
||||
$this->pdo->beginTransaction();
|
||||
|
||||
try {
|
||||
$select = $this->pdo->prepare(
|
||||
'SELECT attempts, max_attempts
|
||||
FROM cron_jobs
|
||||
WHERE id = :id
|
||||
LIMIT 1
|
||||
FOR UPDATE'
|
||||
);
|
||||
$select->execute(['id' => $jobId]);
|
||||
|
||||
$row = $select->fetch();
|
||||
if (!is_array($row)) {
|
||||
if ($this->pdo->inTransaction()) {
|
||||
$this->pdo->commit();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
$attempts = (int) ($row['attempts'] ?? 0) + 1;
|
||||
$maxAttempts = max(1, (int) ($row['max_attempts'] ?? 1));
|
||||
$trimmedError = mb_substr(trim($errorMessage), 0, 500);
|
||||
$now = date('Y-m-d H:i:s');
|
||||
|
||||
if ($attempts >= $maxAttempts) {
|
||||
$update = $this->pdo->prepare(
|
||||
'UPDATE cron_jobs SET
|
||||
status = :status,
|
||||
attempts = :attempts,
|
||||
last_error = :last_error,
|
||||
completed_at = :completed_at,
|
||||
updated_at = :updated_at
|
||||
WHERE id = :id'
|
||||
);
|
||||
$update->execute([
|
||||
'id' => $jobId,
|
||||
'status' => 'failed',
|
||||
'attempts' => $attempts,
|
||||
'last_error' => $trimmedError,
|
||||
'completed_at' => $now,
|
||||
'updated_at' => $now,
|
||||
]);
|
||||
|
||||
if ($this->pdo->inTransaction()) {
|
||||
$this->pdo->commit();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
$scheduledAt = (new DateTimeImmutable($now))
|
||||
->modify('+' . max(1, $backoffSeconds) . ' seconds')
|
||||
->format('Y-m-d H:i:s');
|
||||
|
||||
$update = $this->pdo->prepare(
|
||||
'UPDATE cron_jobs SET
|
||||
status = :status,
|
||||
attempts = :attempts,
|
||||
last_error = :last_error,
|
||||
scheduled_at = :scheduled_at,
|
||||
started_at = NULL,
|
||||
completed_at = NULL,
|
||||
updated_at = :updated_at
|
||||
WHERE id = :id'
|
||||
);
|
||||
$update->execute([
|
||||
'id' => $jobId,
|
||||
'status' => 'pending',
|
||||
'attempts' => $attempts,
|
||||
'last_error' => $trimmedError,
|
||||
'scheduled_at' => $scheduledAt,
|
||||
'updated_at' => $now,
|
||||
]);
|
||||
|
||||
if ($this->pdo->inTransaction()) {
|
||||
$this->pdo->commit();
|
||||
}
|
||||
|
||||
return false;
|
||||
} catch (Throwable $exception) {
|
||||
if ($this->pdo->inTransaction()) {
|
||||
$this->pdo->rollBack();
|
||||
}
|
||||
|
||||
throw $exception;
|
||||
}
|
||||
}
|
||||
|
||||
public function recoverStuck(int $olderThanMinutes = 15): int
|
||||
{
|
||||
$threshold = (new DateTimeImmutable())
|
||||
->modify('-' . max(1, $olderThanMinutes) . ' minutes')
|
||||
->format('Y-m-d H:i:s');
|
||||
$now = date('Y-m-d H:i:s');
|
||||
|
||||
$statement = $this->pdo->prepare(
|
||||
'UPDATE cron_jobs SET
|
||||
status = :status,
|
||||
started_at = NULL,
|
||||
scheduled_at = :scheduled_at,
|
||||
updated_at = :updated_at
|
||||
WHERE status = :processing_status
|
||||
AND started_at IS NOT NULL
|
||||
AND started_at < :threshold'
|
||||
);
|
||||
$statement->execute([
|
||||
'status' => 'pending',
|
||||
'processing_status' => 'processing',
|
||||
'scheduled_at' => $now,
|
||||
'updated_at' => $now,
|
||||
'threshold' => $threshold,
|
||||
]);
|
||||
|
||||
return $statement->rowCount();
|
||||
}
|
||||
|
||||
public function cleanup(int $olderThanDays = 30): int
|
||||
{
|
||||
$threshold = (new DateTimeImmutable())
|
||||
->modify('-' . max(1, $olderThanDays) . ' days')
|
||||
->format('Y-m-d H:i:s');
|
||||
|
||||
$statement = $this->pdo->prepare(
|
||||
'DELETE FROM cron_jobs
|
||||
WHERE status IN (\'completed\', \'failed\', \'cancelled\')
|
||||
AND completed_at IS NOT NULL
|
||||
AND completed_at < :threshold'
|
||||
);
|
||||
$statement->execute(['threshold' => $threshold]);
|
||||
|
||||
return $statement->rowCount();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array<int, array<string, mixed>>
|
||||
*/
|
||||
public function getDueSchedules(): array
|
||||
{
|
||||
$statement = $this->pdo->prepare(
|
||||
'SELECT id, job_type, interval_seconds, priority, max_attempts, payload,
|
||||
enabled, last_run_at, next_run_at, created_at, updated_at
|
||||
FROM cron_schedules
|
||||
WHERE enabled = 1
|
||||
AND (next_run_at IS NULL OR next_run_at <= :now)
|
||||
ORDER BY priority ASC, next_run_at ASC, id ASC'
|
||||
);
|
||||
$statement->execute(['now' => date('Y-m-d H:i:s')]);
|
||||
|
||||
$rows = $statement->fetchAll();
|
||||
if (!is_array($rows)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return array_map([$this, 'mapScheduleRow'], $rows);
|
||||
}
|
||||
|
||||
public function touchSchedule(int $scheduleId, int $intervalSeconds): void
|
||||
{
|
||||
$safeInterval = max(60, $intervalSeconds);
|
||||
$now = date('Y-m-d H:i:s');
|
||||
$nextRunAt = (new DateTimeImmutable($now))
|
||||
->modify('+' . $safeInterval . ' seconds')
|
||||
->format('Y-m-d H:i:s');
|
||||
|
||||
$statement = $this->pdo->prepare(
|
||||
'UPDATE cron_schedules SET
|
||||
last_run_at = :last_run_at,
|
||||
next_run_at = :next_run_at,
|
||||
updated_at = :updated_at
|
||||
WHERE id = :id'
|
||||
);
|
||||
$statement->execute([
|
||||
'id' => $scheduleId,
|
||||
'last_run_at' => $now,
|
||||
'next_run_at' => $nextRunAt,
|
||||
'updated_at' => $now,
|
||||
]);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array<int, array<string, mixed>>
|
||||
*/
|
||||
public function listPastJobs(int $limit = 100): array
|
||||
{
|
||||
$statement = $this->pdo->prepare(
|
||||
'SELECT id, job_type, status, priority, payload, result, attempts, max_attempts,
|
||||
last_error, scheduled_at, started_at, completed_at, created_at, updated_at
|
||||
FROM cron_jobs
|
||||
WHERE scheduled_at <= :now
|
||||
ORDER BY scheduled_at DESC, id DESC
|
||||
LIMIT :limit'
|
||||
);
|
||||
$statement->bindValue(':now', date('Y-m-d H:i:s'));
|
||||
$statement->bindValue(':limit', max(1, min(500, $limit)), PDO::PARAM_INT);
|
||||
$statement->execute();
|
||||
|
||||
$rows = $statement->fetchAll();
|
||||
if (!is_array($rows)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return array_map([$this, 'mapJobRow'], $rows);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array<int, array<string, mixed>>
|
||||
*/
|
||||
public function listFutureJobs(int $limit = 100): array
|
||||
{
|
||||
$statement = $this->pdo->prepare(
|
||||
'SELECT id, job_type, status, priority, payload, result, attempts, max_attempts,
|
||||
last_error, scheduled_at, started_at, completed_at, created_at, updated_at
|
||||
FROM cron_jobs
|
||||
WHERE scheduled_at > :now
|
||||
ORDER BY scheduled_at ASC, priority ASC, id ASC
|
||||
LIMIT :limit'
|
||||
);
|
||||
$statement->bindValue(':now', date('Y-m-d H:i:s'));
|
||||
$statement->bindValue(':limit', max(1, min(500, $limit)), PDO::PARAM_INT);
|
||||
$statement->execute();
|
||||
|
||||
$rows = $statement->fetchAll();
|
||||
if (!is_array($rows)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return array_map([$this, 'mapJobRow'], $rows);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array<int, array<string, mixed>>
|
||||
*/
|
||||
public function listSchedules(int $limit = 100): array
|
||||
{
|
||||
$statement = $this->pdo->prepare(
|
||||
'SELECT id, job_type, interval_seconds, priority, max_attempts, payload,
|
||||
enabled, last_run_at, next_run_at, created_at, updated_at
|
||||
FROM cron_schedules
|
||||
ORDER BY priority ASC, job_type ASC
|
||||
LIMIT :limit'
|
||||
);
|
||||
$statement->bindValue(':limit', max(1, min(500, $limit)), PDO::PARAM_INT);
|
||||
$statement->execute();
|
||||
|
||||
$rows = $statement->fetchAll();
|
||||
if (!is_array($rows)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return array_map([$this, 'mapScheduleRow'], $rows);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<string, mixed>|null $payload
|
||||
*/
|
||||
private function encodeJson(?array $payload): ?string
|
||||
{
|
||||
if ($payload === null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
$encoded = json_encode($payload, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
|
||||
if ($encoded === false) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return $encoded;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<string, mixed> $row
|
||||
* @return array<string, mixed>
|
||||
*/
|
||||
private function mapJobRow(array $row): array
|
||||
{
|
||||
return [
|
||||
'id' => (int) ($row['id'] ?? 0),
|
||||
'job_type' => (string) ($row['job_type'] ?? ''),
|
||||
'status' => (string) ($row['status'] ?? ''),
|
||||
'priority' => (int) ($row['priority'] ?? 100),
|
||||
'payload' => $this->decodeJson($row['payload'] ?? null),
|
||||
'result' => $this->decodeJson($row['result'] ?? null),
|
||||
'attempts' => (int) ($row['attempts'] ?? 0),
|
||||
'max_attempts' => (int) ($row['max_attempts'] ?? 0),
|
||||
'last_error' => isset($row['last_error']) ? (string) $row['last_error'] : null,
|
||||
'scheduled_at' => isset($row['scheduled_at']) ? (string) $row['scheduled_at'] : null,
|
||||
'started_at' => isset($row['started_at']) ? (string) $row['started_at'] : null,
|
||||
'completed_at' => isset($row['completed_at']) ? (string) $row['completed_at'] : null,
|
||||
'created_at' => isset($row['created_at']) ? (string) $row['created_at'] : null,
|
||||
'updated_at' => isset($row['updated_at']) ? (string) $row['updated_at'] : null,
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<string, mixed> $row
|
||||
* @return array<string, mixed>
|
||||
*/
|
||||
private function mapScheduleRow(array $row): array
|
||||
{
|
||||
return [
|
||||
'id' => (int) ($row['id'] ?? 0),
|
||||
'job_type' => (string) ($row['job_type'] ?? ''),
|
||||
'interval_seconds' => (int) ($row['interval_seconds'] ?? 0),
|
||||
'priority' => (int) ($row['priority'] ?? 100),
|
||||
'max_attempts' => (int) ($row['max_attempts'] ?? 3),
|
||||
'payload' => $this->decodeJson($row['payload'] ?? null),
|
||||
'enabled' => ((int) ($row['enabled'] ?? 0)) === 1,
|
||||
'last_run_at' => isset($row['last_run_at']) ? (string) $row['last_run_at'] : null,
|
||||
'next_run_at' => isset($row['next_run_at']) ? (string) $row['next_run_at'] : null,
|
||||
'created_at' => isset($row['created_at']) ? (string) $row['created_at'] : null,
|
||||
'updated_at' => isset($row['updated_at']) ? (string) $row['updated_at'] : null,
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array<string, mixed>|null
|
||||
*/
|
||||
private function decodeJson(mixed $value): ?array
|
||||
{
|
||||
if ($value === null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
$raw = trim((string) $value);
|
||||
if ($raw === '') {
|
||||
return null;
|
||||
}
|
||||
|
||||
$decoded = json_decode($raw, true);
|
||||
if (!is_array($decoded)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return $decoded;
|
||||
}
|
||||
}
|
||||
@@ -1,32 +0,0 @@
|
||||
<?php
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Modules\Cron;
|
||||
|
||||
final class CronJobType
|
||||
{
|
||||
public const PRODUCT_LINKS_HEALTH_CHECK = 'product_links_health_check';
|
||||
public const SHOPPRO_OFFER_TITLES_REFRESH = 'shoppro_offer_titles_refresh';
|
||||
|
||||
public const PRIORITY_HIGH = 50;
|
||||
public const PRIORITY_NORMAL = 100;
|
||||
public const PRIORITY_LOW = 200;
|
||||
|
||||
public static function priorityFor(string $jobType): int
|
||||
{
|
||||
return match (trim($jobType)) {
|
||||
self::PRODUCT_LINKS_HEALTH_CHECK => 110,
|
||||
self::SHOPPRO_OFFER_TITLES_REFRESH => 170,
|
||||
default => self::PRIORITY_NORMAL,
|
||||
};
|
||||
}
|
||||
|
||||
public static function maxAttemptsFor(string $jobType): int
|
||||
{
|
||||
return match (trim($jobType)) {
|
||||
self::PRODUCT_LINKS_HEALTH_CHECK => 3,
|
||||
self::SHOPPRO_OFFER_TITLES_REFRESH => 3,
|
||||
default => 3,
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -1,140 +0,0 @@
|
||||
<?php
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Modules\Cron;
|
||||
|
||||
use App\Modules\ProductLinks\ChannelOffersRepository;
|
||||
use App\Modules\ProductLinks\OfferImportService;
|
||||
use App\Modules\ProductLinks\ProductLinksRepository;
|
||||
use App\Modules\Settings\IntegrationRepository;
|
||||
use Throwable;
|
||||
|
||||
final class ProductLinksHealthCheckHandler
|
||||
{
|
||||
private const ALERT_TYPE = 'missing_remote_link';
|
||||
private const ALERT_MESSAGE = 'Powiazanie nie istnieje juz po stronie zewnetrznej.';
|
||||
|
||||
public function __construct(
|
||||
private readonly IntegrationRepository $integrations,
|
||||
private readonly OfferImportService $offerImportService,
|
||||
private readonly ProductLinksRepository $links,
|
||||
private readonly ChannelOffersRepository $offers
|
||||
) {
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<string, mixed> $payload
|
||||
* @param array<string, mixed> $job
|
||||
* @return array<string, mixed>
|
||||
*/
|
||||
public function __invoke(array $payload = [], array $job = []): array
|
||||
{
|
||||
$forcedIntegrationId = max(0, (int) ($payload['integration_id'] ?? 0));
|
||||
$activeIntegrations = array_values(array_filter(
|
||||
$this->integrations->listByType('shoppro'),
|
||||
static function (array $integration) use ($forcedIntegrationId): bool {
|
||||
$id = (int) ($integration['id'] ?? 0);
|
||||
if ($forcedIntegrationId > 0 && $id !== $forcedIntegrationId) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return $id > 0
|
||||
&& ($integration['is_active'] ?? false) === true
|
||||
&& ($integration['has_api_key'] ?? false) === true;
|
||||
}
|
||||
));
|
||||
|
||||
if ($activeIntegrations === []) {
|
||||
return [
|
||||
'ok' => true,
|
||||
'message' => 'Brak aktywnych integracji z kluczem API do weryfikacji powiazan.',
|
||||
'checked_links' => 0,
|
||||
'missing_links' => 0,
|
||||
'integrations' => 0,
|
||||
'integration_failures' => 0,
|
||||
];
|
||||
}
|
||||
|
||||
$checkedLinks = 0;
|
||||
$missingLinks = 0;
|
||||
$resolvedAlerts = 0;
|
||||
$integrationFailures = 0;
|
||||
$errors = [];
|
||||
$checkedAt = date('Y-m-d H:i:s');
|
||||
|
||||
foreach ($activeIntegrations as $integration) {
|
||||
$integrationId = (int) ($integration['id'] ?? 0);
|
||||
if ($integrationId <= 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
$credentials = $this->integrations->findApiCredentials($integrationId);
|
||||
} catch (Throwable $exception) {
|
||||
$integrationFailures++;
|
||||
if (count($errors) < 5) {
|
||||
$errors[] = 'Integracja #' . $integrationId . ': ' . $exception->getMessage();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if ($credentials === null || trim((string) ($credentials['api_key'] ?? '')) === '') {
|
||||
$integrationFailures++;
|
||||
if (count($errors) < 5) {
|
||||
$errors[] = 'Integracja #' . $integrationId . ': brak poprawnych danych API.';
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
$import = $this->offerImportService->importShopProOffers($credentials);
|
||||
if (($import['ok'] ?? false) !== true) {
|
||||
$integrationFailures++;
|
||||
if (count($errors) < 5) {
|
||||
$errors[] = 'Integracja #' . $integrationId . ': ' . trim((string) ($import['message'] ?? 'Blad importu ofert.'));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
$links = $this->links->listActiveLinksForMissingCheck($integrationId);
|
||||
foreach ($links as $link) {
|
||||
$mapId = (int) ($link['id'] ?? 0);
|
||||
$externalProductId = trim((string) ($link['external_product_id'] ?? ''));
|
||||
$externalVariantId = $this->nullableText($link['external_variant_id'] ?? null);
|
||||
if ($mapId <= 0 || $externalProductId === '') {
|
||||
continue;
|
||||
}
|
||||
|
||||
$checkedLinks++;
|
||||
$offer = $this->offers->findByExternalIdentity($integrationId, $externalProductId, $externalVariantId);
|
||||
|
||||
if ($offer === null) {
|
||||
$missingLinks++;
|
||||
$this->links->upsertActiveAlert($mapId, self::ALERT_TYPE, self::ALERT_MESSAGE, $checkedAt);
|
||||
continue;
|
||||
}
|
||||
|
||||
$this->links->resolveActiveAlert($mapId, self::ALERT_TYPE, $checkedAt);
|
||||
$resolvedAlerts++;
|
||||
}
|
||||
}
|
||||
|
||||
return [
|
||||
'ok' => $integrationFailures === 0,
|
||||
'message' => $integrationFailures === 0
|
||||
? 'Weryfikacja powiazan zakonczona.'
|
||||
: 'Weryfikacja zakonczona z bledami integracji.',
|
||||
'checked_links' => $checkedLinks,
|
||||
'missing_links' => $missingLinks,
|
||||
'resolved_alerts' => $resolvedAlerts,
|
||||
'integrations' => count($activeIntegrations),
|
||||
'integration_failures' => $integrationFailures,
|
||||
'errors' => $errors,
|
||||
];
|
||||
}
|
||||
|
||||
private function nullableText(mixed $value): ?string
|
||||
{
|
||||
$text = trim((string) $value);
|
||||
return $text === '' ? null : $text;
|
||||
}
|
||||
}
|
||||
@@ -1,106 +0,0 @@
|
||||
<?php
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Modules\Cron;
|
||||
|
||||
use App\Modules\ProductLinks\OfferImportService;
|
||||
use App\Modules\Settings\IntegrationRepository;
|
||||
use Throwable;
|
||||
|
||||
final class ShopProOfferTitlesRefreshHandler
|
||||
{
|
||||
public function __construct(
|
||||
private readonly IntegrationRepository $integrations,
|
||||
private readonly OfferImportService $offerImportService
|
||||
) {
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<string, mixed> $payload
|
||||
* @param array<string, mixed> $job
|
||||
* @return array<string, mixed>
|
||||
*/
|
||||
public function __invoke(array $payload = [], array $job = []): array
|
||||
{
|
||||
$forcedIntegrationId = max(0, (int) ($payload['integration_id'] ?? 0));
|
||||
$activeIntegrations = array_values(array_filter(
|
||||
$this->integrations->listByType('shoppro'),
|
||||
static function (array $integration) use ($forcedIntegrationId): bool {
|
||||
$id = (int) ($integration['id'] ?? 0);
|
||||
if ($forcedIntegrationId > 0 && $id !== $forcedIntegrationId) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return $id > 0
|
||||
&& ($integration['is_active'] ?? false) === true
|
||||
&& ($integration['has_api_key'] ?? false) === true;
|
||||
}
|
||||
));
|
||||
|
||||
if ($activeIntegrations === []) {
|
||||
return [
|
||||
'ok' => true,
|
||||
'message' => 'Brak aktywnych integracji z kluczem API do odswiezenia tytulow ofert.',
|
||||
'integrations' => 0,
|
||||
'updated_offers' => 0,
|
||||
'failed_offers' => 0,
|
||||
'integration_failures' => 0,
|
||||
'errors' => [],
|
||||
];
|
||||
}
|
||||
|
||||
$updatedOffers = 0;
|
||||
$failedOffers = 0;
|
||||
$integrationFailures = 0;
|
||||
$errors = [];
|
||||
|
||||
foreach ($activeIntegrations as $integration) {
|
||||
$integrationId = (int) ($integration['id'] ?? 0);
|
||||
if ($integrationId <= 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
$credentials = $this->integrations->findApiCredentials($integrationId);
|
||||
} catch (Throwable $exception) {
|
||||
$integrationFailures++;
|
||||
if (count($errors) < 5) {
|
||||
$errors[] = 'Integracja #' . $integrationId . ': ' . $exception->getMessage();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if ($credentials === null || trim((string) ($credentials['api_key'] ?? '')) === '') {
|
||||
$integrationFailures++;
|
||||
if (count($errors) < 5) {
|
||||
$errors[] = 'Integracja #' . $integrationId . ': brak poprawnych danych API.';
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
$import = $this->offerImportService->importShopProOffers($credentials);
|
||||
if (($import['ok'] ?? false) !== true) {
|
||||
$integrationFailures++;
|
||||
if (count($errors) < 5) {
|
||||
$errors[] = 'Integracja #' . $integrationId . ': ' . trim((string) ($import['message'] ?? 'Blad importu ofert.'));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
$updatedOffers += (int) ($import['imported'] ?? 0);
|
||||
$failedOffers += (int) ($import['failed'] ?? 0);
|
||||
}
|
||||
|
||||
return [
|
||||
'ok' => $integrationFailures === 0,
|
||||
'message' => $integrationFailures === 0
|
||||
? 'Odswiezenie tytulow ofert zakonczone.'
|
||||
: 'Odswiezenie tytulow zakonczone z bledami integracji.',
|
||||
'integrations' => count($activeIntegrations),
|
||||
'updated_offers' => $updatedOffers,
|
||||
'failed_offers' => $failedOffers,
|
||||
'integration_failures' => $integrationFailures,
|
||||
'errors' => $errors,
|
||||
];
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user