$payload * @return array */ public function sync(array $payload = []): array { $forcedIntegrationId = max(0, (int) ($payload['integration_id'] ?? 0)); $integrations = array_values(array_filter( $this->integrations->listByType('shoppro'), static function (array $integration) use ($forcedIntegrationId): bool { $integrationId = (int) ($integration['id'] ?? 0); if ($integrationId <= 0) { return false; } if ($forcedIntegrationId > 0 && $integrationId !== $forcedIntegrationId) { return false; } return ($integration['is_active'] ?? false) === true && ($integration['has_api_key'] ?? false) === true; } )); if ($integrations === []) { return [ 'ok' => true, 'message' => 'Brak aktywnych integracji do synchronizacji statusow.', 'checked_integrations' => 0, 'processed_orders' => 0, 'failed_integrations' => 0, 'errors' => [], ]; } $processedOrders = 0; $failedIntegrations = 0; $errors = []; foreach ($integrations as $integration) { $integrationId = (int) ($integration['id'] ?? 0); if ($integrationId <= 0) { continue; } try { $credentials = $this->integrations->findApiCredentials($integrationId); if ($credentials === null || trim((string) ($credentials['api_key'] ?? '')) === '') { throw new \RuntimeException('Brak poprawnych danych API.'); } $direction = $this->normalizeDirection((string) ($integration['order_status_sync_direction'] ?? '')); $result = $direction === self::DIRECTION_ORDERPRO_TO_SHOPPRO ? $this->syncOrderProToShopPro($integrationId, $credentials) : $this->syncShopProToOrderPro($integrationId, $credentials, $integration); $processedOrders += (int) ($result['processed_orders'] ?? 0); } catch (Throwable $exception) { $failedIntegrations++; $this->touchState($integrationId, $this->normalizeDirection((string) ($integration['order_status_sync_direction'] ?? '')), $exception->getMessage()); if (count($errors) < 10) { $errors[] = 'Integracja #' . $integrationId . ': ' . $exception->getMessage(); } } } return [ 'ok' => $failedIntegrations === 0, 'message' => $failedIntegrations === 0 ? 'Synchronizacja statusow zamowien zakonczona.' : 'Synchronizacja statusow zakonczona z bledami.', 'checked_integrations' => count($integrations), 'processed_orders' => $processedOrders, 'failed_integrations' => $failedIntegrations, 'errors' => $errors, ]; } /** * @param array $credentials * @param array $integration * @return array{processed_orders:int} */ private function syncShopProToOrderPro(int $integrationId, array $credentials, array $integration): array { $direction = self::DIRECTION_SHOPPRO_TO_ORDERPRO; $state = $this->findState($integrationId, $direction); $cursorAt = $this->normalizeDateTime($state['last_synced_at'] ?? null); $cursorRef = trim((string) ($state['last_synced_order_ref'] ?? '')); $fromDate = $this->resolveFromDate( $this->normalizeDateOnly((string) ($integration['orders_fetch_start_date'] ?? '')), $cursorAt ); $response = $this->shopProClient->fetchOrders( (string) ($credentials['base_url'] ?? ''), (string) ($credentials['api_key'] ?? ''), (int) ($credentials['timeout_seconds'] ?? 10), 1, 100, $fromDate ); if (($response['ok'] ?? false) !== true) { $message = trim((string) ($response['message'] ?? 'Blad pobierania statusow z shopPRO.')); $this->touchState($integrationId, $direction, $message); throw new \RuntimeException($message); } $items = is_array($response['items'] ?? null) ? $response['items'] : []; $candidates = $this->buildShopProCandidates($items, $cursorAt, $cursorRef); if ($candidates === []) { $this->touchState($integrationId, $direction, null); return ['processed_orders' => 0]; } $processed = 0; foreach ($candidates as $candidate) { $externalOrderId = (string) ($candidate['external_order_id'] ?? ''); $externalUpdatedAt = (string) ($candidate['external_updated_at'] ?? ''); $status = trim((string) ($candidate['status'] ?? '')); if ($externalOrderId === '' || $externalUpdatedAt === '') { continue; } $local = $this->orders->findByIntegrationExternalOrderId($integrationId, $externalOrderId); if ($local !== null && $status !== '') { $localStatus = trim((string) ($local['status'] ?? '')); if (mb_strtolower($localStatus) !== mb_strtolower($status)) { $this->orders->updateStatus((int) ($local['id'] ?? 0), $status, $externalUpdatedAt); $processed++; } } $this->advanceState($integrationId, $direction, $externalUpdatedAt, $externalOrderId); } return ['processed_orders' => $processed]; } /** * @param array $credentials * @return array{processed_orders:int} */ private function syncOrderProToShopPro(int $integrationId, array $credentials): array { $direction = self::DIRECTION_ORDERPRO_TO_SHOPPRO; $state = $this->findState($integrationId, $direction); $cursorAt = $this->normalizeDateTime($state['last_synced_at'] ?? null); $cursorOrderId = max(0, (int) ($state['last_synced_order_ref'] ?? 0)); $rows = $this->orders->listForStatusPush($integrationId, $cursorAt, $cursorOrderId, 100); if ($rows === []) { $this->touchState($integrationId, $direction, null); return ['processed_orders' => 0]; } $mapping = $this->mappings->listOrderProToShopProMap($integrationId); $processed = 0; foreach ($rows as $row) { $orderId = (int) ($row['id'] ?? 0); $externalOrderId = trim((string) ($row['external_order_id'] ?? '')); $orderProStatus = $this->normalizeCode((string) ($row['status'] ?? '')); $updatedAt = (string) ($row['updated_at'] ?? ''); if ($orderId <= 0 || $updatedAt === '') { continue; } if ($externalOrderId === '' || $orderProStatus === '' || !isset($mapping[$orderProStatus])) { $this->advanceState($integrationId, $direction, $updatedAt, (string) $orderId); continue; } $shopStatusCode = trim((string) $mapping[$orderProStatus]); if ($shopStatusCode === '') { $this->advanceState($integrationId, $direction, $updatedAt, (string) $orderId); continue; } $response = $this->shopProClient->updateOrderStatus( (string) ($credentials['base_url'] ?? ''), (string) ($credentials['api_key'] ?? ''), (int) ($credentials['timeout_seconds'] ?? 10), $externalOrderId, $shopStatusCode ); if (($response['ok'] ?? false) !== true) { $message = trim((string) ($response['message'] ?? 'Blad aktualizacji statusu zamowienia w shopPRO.')); $this->touchState($integrationId, $direction, $message); throw new \RuntimeException($message); } $this->advanceState($integrationId, $direction, $updatedAt, (string) $orderId); $processed++; } return ['processed_orders' => $processed]; } /** * @param array $items * @return array */ private function buildShopProCandidates(array $items, ?string $cursorAt, string $cursorRef): array { $result = []; foreach ($items as $item) { if (!is_array($item)) { continue; } $externalOrderId = $this->normalizeOrderId($this->readPath($item, ['id', 'order_id', 'external_order_id'])); $externalUpdatedAt = $this->normalizeDateTime($this->readPath($item, [ 'updated_at', 'date_updated', 'modified_at', 'date_modified', 'created_at', 'date_created', ])); $status = trim((string) $this->readPath($item, ['status', 'order_status'])); if ($externalOrderId === '' || $externalUpdatedAt === null || $status === '') { continue; } if (!$this->isAfterCursor($externalUpdatedAt, $externalOrderId, $cursorAt, $cursorRef)) { continue; } $result[] = [ 'external_order_id' => $externalOrderId, 'external_updated_at' => $externalUpdatedAt, 'status' => $status, ]; } usort($result, function (array $a, array $b): int { $dateCmp = strcmp((string) ($a['external_updated_at'] ?? ''), (string) ($b['external_updated_at'] ?? '')); if ($dateCmp !== 0) { return $dateCmp; } return $this->compareOrderRef( (string) ($a['external_order_id'] ?? ''), (string) ($b['external_order_id'] ?? '') ); }); return $result; } private function isAfterCursor(string $itemAt, string $itemRef, ?string $cursorAt, string $cursorRef): bool { if ($cursorAt === null) { return true; } $dateCmp = strcmp($itemAt, $cursorAt); if ($dateCmp > 0) { return true; } if ($dateCmp < 0) { return false; } if ($cursorRef === '') { return true; } return $this->compareOrderRef($itemRef, $cursorRef) > 0; } private function compareOrderRef(string $left, string $right): int { $leftRaw = trim($left); $rightRaw = trim($right); if (ctype_digit($leftRaw) && ctype_digit($rightRaw)) { return (int) $leftRaw <=> (int) $rightRaw; } return strcmp($leftRaw, $rightRaw); } private function resolveFromDate(?string $integrationStartDate, ?string $cursorDateTime): ?string { $cursorDate = null; if ($cursorDateTime !== null) { $cursorDate = substr($cursorDateTime, 0, 10); } if ($integrationStartDate === null) { return $cursorDate; } if ($cursorDate === null) { return $integrationStartDate; } return strcmp($integrationStartDate, $cursorDate) > 0 ? $integrationStartDate : $cursorDate; } private function normalizeDirection(string $value): string { $normalized = trim(mb_strtolower($value)); if ($normalized === self::DIRECTION_ORDERPRO_TO_SHOPPRO) { return self::DIRECTION_ORDERPRO_TO_SHOPPRO; } return self::DIRECTION_SHOPPRO_TO_ORDERPRO; } /** * @return array|null */ private function findState(int $integrationId, string $direction): ?array { $stmt = $this->pdo->prepare( 'SELECT integration_id, direction, last_synced_at, last_synced_order_ref, last_run_at, last_error FROM integration_order_status_sync_state WHERE integration_id = :integration_id AND direction = :direction LIMIT 1' ); $stmt->execute([ 'integration_id' => $integrationId, 'direction' => $direction, ]); $row = $stmt->fetch(); return is_array($row) ? $row : null; } private function touchState(int $integrationId, string $direction, ?string $error): void { $now = date('Y-m-d H:i:s'); $stmt = $this->pdo->prepare( 'INSERT INTO integration_order_status_sync_state ( integration_id, direction, last_synced_at, last_synced_order_ref, last_run_at, last_error, created_at, updated_at ) VALUES ( :integration_id, :direction, NULL, NULL, :last_run_at, :last_error, :created_at, :updated_at ) ON DUPLICATE KEY UPDATE last_run_at = VALUES(last_run_at), last_error = VALUES(last_error), updated_at = VALUES(updated_at)' ); $stmt->execute([ 'integration_id' => $integrationId, 'direction' => $direction, 'last_run_at' => $now, 'last_error' => $this->nullableString($error), 'created_at' => $now, 'updated_at' => $now, ]); } private function advanceState(int $integrationId, string $direction, string $cursorAt, string $cursorRef): void { $now = date('Y-m-d H:i:s'); $stmt = $this->pdo->prepare( 'INSERT INTO integration_order_status_sync_state ( integration_id, direction, last_synced_at, last_synced_order_ref, last_run_at, last_error, created_at, updated_at ) VALUES ( :integration_id, :direction, :last_synced_at, :last_synced_order_ref, :last_run_at, NULL, :created_at, :updated_at ) ON DUPLICATE KEY UPDATE last_synced_at = VALUES(last_synced_at), last_synced_order_ref = VALUES(last_synced_order_ref), last_run_at = VALUES(last_run_at), last_error = NULL, updated_at = VALUES(updated_at)' ); $stmt->execute([ 'integration_id' => $integrationId, 'direction' => $direction, 'last_synced_at' => $cursorAt, 'last_synced_order_ref' => $cursorRef, 'last_run_at' => $now, 'created_at' => $now, 'updated_at' => $now, ]); } private function readPath(array $data, array $paths): mixed { foreach ($paths as $path) { $current = $data; $segments = explode('.', (string) $path); $found = true; foreach ($segments as $segment) { if (!is_array($current) || !array_key_exists($segment, $current)) { $found = false; break; } $current = $current[$segment]; } if ($found) { return $current; } } return null; } private function normalizeOrderId(mixed $value): string { return trim((string) $value); } private function normalizeDateOnly(mixed $value): ?string { $text = trim((string) $value); if ($text === '') { return null; } if (preg_match('/^\d{4}-\d{2}-\d{2}$/', $text) !== 1) { return null; } return $text; } private function normalizeDateTime(mixed $value): ?string { $text = trim((string) $value); if ($text === '') { return null; } $timestamp = strtotime($text); if ($timestamp === false) { return null; } return date('Y-m-d H:i:s', $timestamp); } private function normalizeCode(string $value): string { return trim(mb_strtolower($value)); } private function nullableString(mixed $value): ?string { $text = trim((string) $value); return $text === '' ? null : $text; } }