feat(128): erli orders import
Phase 128 complete: - add Erli /inbox order import with safe mark-read ACK - add cron/manual import controls and sync state tracking - map Erli orders into orderPRO aggregates with mapper tests and docs
This commit is contained in:
143
src/Modules/Settings/ErliOrderSyncStateRepository.php
Normal file
143
src/Modules/Settings/ErliOrderSyncStateRepository.php
Normal file
@@ -0,0 +1,143 @@
|
||||
<?php
|
||||
declare(strict_types=1);
|
||||
|
||||
namespace App\Modules\Settings;
|
||||
|
||||
use App\Core\Support\StringHelper;
|
||||
use DateTimeImmutable;
|
||||
use PDO;
|
||||
use Throwable;
|
||||
|
||||
final class ErliOrderSyncStateRepository
|
||||
{
|
||||
public function __construct(private readonly PDO $pdo)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array{last_synced_updated_at:?string,last_synced_source_order_id:?string,last_run_at:?string,last_success_at:?string,last_error:?string}
|
||||
*/
|
||||
public function getState(int $integrationId): array
|
||||
{
|
||||
$default = $this->defaultState();
|
||||
if ($integrationId <= 0) {
|
||||
return $default;
|
||||
}
|
||||
|
||||
try {
|
||||
$statement = $this->pdo->prepare(
|
||||
'SELECT last_synced_order_updated_at,
|
||||
last_synced_source_order_id,
|
||||
last_run_at,
|
||||
last_success_at,
|
||||
last_error
|
||||
FROM integration_order_sync_state
|
||||
WHERE integration_id = :integration_id
|
||||
LIMIT 1'
|
||||
);
|
||||
$statement->execute(['integration_id' => $integrationId]);
|
||||
$row = $statement->fetch(PDO::FETCH_ASSOC);
|
||||
} catch (Throwable) {
|
||||
return $default;
|
||||
}
|
||||
|
||||
if (!is_array($row)) {
|
||||
return $default;
|
||||
}
|
||||
|
||||
return [
|
||||
'last_synced_updated_at' => StringHelper::nullableString((string) ($row['last_synced_order_updated_at'] ?? '')),
|
||||
'last_synced_source_order_id' => StringHelper::nullableString((string) ($row['last_synced_source_order_id'] ?? '')),
|
||||
'last_run_at' => StringHelper::nullableString((string) ($row['last_run_at'] ?? '')),
|
||||
'last_success_at' => StringHelper::nullableString((string) ($row['last_success_at'] ?? '')),
|
||||
'last_error' => StringHelper::nullableString((string) ($row['last_error'] ?? '')),
|
||||
];
|
||||
}
|
||||
|
||||
public function markRunStarted(int $integrationId, DateTimeImmutable $now): void
|
||||
{
|
||||
$this->upsertState($integrationId, [
|
||||
'last_run_at' => $now->format('Y-m-d H:i:s'),
|
||||
]);
|
||||
}
|
||||
|
||||
public function markRunSuccess(
|
||||
int $integrationId,
|
||||
DateTimeImmutable $now,
|
||||
?string $lastMessageCreatedAt,
|
||||
?string $lastMessageId
|
||||
): void {
|
||||
$this->upsertState($integrationId, [
|
||||
'last_run_at' => $now->format('Y-m-d H:i:s'),
|
||||
'last_success_at' => $now->format('Y-m-d H:i:s'),
|
||||
'last_error' => null,
|
||||
'last_synced_order_updated_at' => $lastMessageCreatedAt,
|
||||
'last_synced_source_order_id' => $lastMessageId,
|
||||
]);
|
||||
}
|
||||
|
||||
public function markRunFailed(int $integrationId, DateTimeImmutable $now, string $error): void
|
||||
{
|
||||
$this->upsertState($integrationId, [
|
||||
'last_run_at' => $now->format('Y-m-d H:i:s'),
|
||||
'last_error' => mb_substr(trim($error), 0, 500),
|
||||
]);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array{last_synced_updated_at:?string,last_synced_source_order_id:?string,last_run_at:?string,last_success_at:?string,last_error:?string}
|
||||
*/
|
||||
private function defaultState(): array
|
||||
{
|
||||
return [
|
||||
'last_synced_updated_at' => null,
|
||||
'last_synced_source_order_id' => null,
|
||||
'last_run_at' => null,
|
||||
'last_success_at' => null,
|
||||
'last_error' => null,
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<string, mixed> $changes
|
||||
*/
|
||||
private function upsertState(int $integrationId, array $changes): void
|
||||
{
|
||||
if ($integrationId <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
$columns = ['integration_id', 'created_at', 'updated_at'];
|
||||
$values = [':integration_id', 'NOW()', 'NOW()'];
|
||||
$updates = ['updated_at = NOW()'];
|
||||
$params = ['integration_id' => $integrationId];
|
||||
|
||||
foreach ($changes as $column => $value) {
|
||||
$allowed = [
|
||||
'last_run_at',
|
||||
'last_success_at',
|
||||
'last_error',
|
||||
'last_synced_order_updated_at',
|
||||
'last_synced_source_order_id',
|
||||
];
|
||||
if (!in_array($column, $allowed, true)) {
|
||||
continue;
|
||||
}
|
||||
$columns[] = $column;
|
||||
$values[] = ':' . $column;
|
||||
$updates[] = $column . ' = VALUES(' . $column . ')';
|
||||
$params[$column] = $value;
|
||||
}
|
||||
|
||||
try {
|
||||
$statement = $this->pdo->prepare(
|
||||
'INSERT INTO integration_order_sync_state (' . implode(', ', $columns) . ')
|
||||
VALUES (' . implode(', ', $values) . ')
|
||||
ON DUPLICATE KEY UPDATE ' . implode(', ', $updates)
|
||||
);
|
||||
$statement->execute($params);
|
||||
} catch (Throwable) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user