- Added PHP support to project configuration. - Updated FTP configuration to exclude additional directories. - Changed remote database host in config.php and enabled debug mode. - Removed outdated TODO from documentation and created a new CRON_QUEUE_PLAN.md. - Introduced a new cron job queue system using database tables pp_cron_jobs and pp_cron_schedules. - Refactored cron job orchestration to improve management and reliability. - Updated OrderAdminService to use the new queue system and removed old file-based logic. - Added migration scripts for new database structure.
157 lines
6.9 KiB
Markdown
157 lines
6.9 KiB
Markdown
# Plan: System kolejki zadań cron oparty o bazę danych
|
|
|
|
## Kontekst
|
|
|
|
Obecny system cron ma dwa problemy:
|
|
1. **Kolejka plikowa (JSON)** — sync płatności/statusów Apilo trzymany w `/temp/apilo-sync-queue.json` — kruchy, brak transakcji, ryzyko utraty danych
|
|
2. **Monolityczny cron.php** (~550 linii) — brak priorytetów, brak retry z backoff, brak centralnego zarządzania
|
|
|
|
Cel: Zastąpienie całego systemu cron tabelą `pp_cron_jobs` z priorytetami, retry/backoff i harmonogramem `pp_cron_schedules`.
|
|
|
|
## Nowe pliki
|
|
|
|
| Plik | Opis |
|
|
|------|------|
|
|
| `autoload/Domain/CronJob/CronJobType.php` | Stałe typów zadań i priorytetów |
|
|
| `autoload/Domain/CronJob/CronJobRepository.php` | CRUD na `pp_cron_jobs` + `pp_cron_schedules` |
|
|
| `autoload/Domain/CronJob/CronJobProcessor.php` | Orkiestracja: pobierz zadanie → wywołaj handler → obsłuż wynik |
|
|
| `tests/Unit/Domain/CronJob/CronJobTypeTest.php` | Testy stałych |
|
|
| `tests/Unit/Domain/CronJob/CronJobRepositoryTest.php` | Testy repozytorium |
|
|
| `tests/Unit/Domain/CronJob/CronJobProcessorTest.php` | Testy procesora |
|
|
| `migrations/0.315.sql` | CREATE TABLE + INSERT harmonogramów |
|
|
|
|
## Modyfikowane pliki
|
|
|
|
| Plik | Zmiana |
|
|
|------|--------|
|
|
| `cron.php` | Zastąpienie ~550 linii orchestratorem (~100 linii) z rejestracją handlerów |
|
|
| `cron/cron-xml.php` | Usunięcie — logika przeniesiona do handlera `google_xml_feed` |
|
|
| `cron-turstmate.php` | Usunięcie — logika przeniesiona do handlera `trustmate_invitation` |
|
|
| `autoload/Domain/Order/OrderAdminService.php` | `queueApiloSync()` → enqueue do DB; usunięcie metod plikowych; `syncApiloPayment()`/`syncApiloStatus()` → public |
|
|
| `tests/Unit/Domain/Order/OrderAdminServiceTest.php` | Refaktor testów kolejki: mock `CronJobRepository` zamiast pliku JSON |
|
|
| `docs/DATABASE_STRUCTURE.md` | Dodanie tabel `pp_cron_jobs`, `pp_cron_schedules` |
|
|
| `docs/CHANGELOG.md` | Wpis o nowym systemie |
|
|
|
|
## Schemat DB (`migrations/0.315.sql`)
|
|
|
|
### `pp_cron_jobs`
|
|
```sql
|
|
CREATE TABLE pp_cron_jobs (
|
|
id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
|
|
job_type VARCHAR(50) NOT NULL,
|
|
status ENUM('pending','processing','completed','failed','cancelled') NOT NULL DEFAULT 'pending',
|
|
priority TINYINT UNSIGNED NOT NULL DEFAULT 100, -- niższy = ważniejszy
|
|
payload TEXT NULL, -- JSON z danymi zadania
|
|
result TEXT NULL, -- JSON z wynikiem
|
|
attempts SMALLINT UNSIGNED NOT NULL DEFAULT 0,
|
|
max_attempts SMALLINT UNSIGNED NOT NULL DEFAULT 10,
|
|
last_error VARCHAR(500) NULL,
|
|
scheduled_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
started_at DATETIME NULL,
|
|
completed_at DATETIME NULL,
|
|
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
INDEX idx_status_priority_scheduled (status, priority, scheduled_at),
|
|
INDEX idx_job_type (job_type),
|
|
INDEX idx_status (status)
|
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
|
|
```
|
|
|
|
### `pp_cron_schedules`
|
|
```sql
|
|
CREATE TABLE pp_cron_schedules (
|
|
id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
|
|
job_type VARCHAR(50) NOT NULL UNIQUE,
|
|
interval_seconds INT UNSIGNED NOT NULL,
|
|
priority TINYINT UNSIGNED NOT NULL DEFAULT 100,
|
|
max_attempts SMALLINT UNSIGNED NOT NULL DEFAULT 3,
|
|
payload TEXT NULL,
|
|
enabled TINYINT(1) NOT NULL DEFAULT 1,
|
|
last_run_at DATETIME NULL,
|
|
next_run_at DATETIME NULL,
|
|
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
INDEX idx_enabled_next_run (enabled, next_run_at)
|
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
|
|
```
|
|
|
|
## Typy zadań i priorytety
|
|
|
|
| Typ | Priorytet | Harmonogram |
|
|
|-----|-----------|-------------|
|
|
| `apilo_token_keepalive` | 10 (krytyczny) | co 4 min |
|
|
| `apilo_send_order` | 50 (wysoki) | co 1 min |
|
|
| `apilo_sync_payment` | 50 (wysoki) | event-driven (enqueue przy zmianie) |
|
|
| `apilo_sync_status` | 50 (wysoki) | event-driven |
|
|
| `apilo_product_sync` | 100 (normalny) | co 10 min |
|
|
| `apilo_pricelist_sync` | 100 (normalny) | co 1h |
|
|
| `apilo_status_poll` | 100 (normalny) | co 10 min |
|
|
| `price_history` | 100 (normalny) | co 24h |
|
|
| `order_analysis` | 100 (normalny) | co 10 min |
|
|
| `trustmate_invitation` | 200 (niski) | co 10 min |
|
|
| `google_xml_feed` | 200 (niski) | co 1h |
|
|
|
|
## Architektura klas
|
|
|
|
### CronJobRepository — metody kluczowe
|
|
- `enqueue($jobType, $payload, $priority, $maxAttempts, $scheduledAt)` — dodaj do kolejki
|
|
- `fetchNext($limit)` — atomowe pobranie pending jobs (UPDATE WHERE status='pending')
|
|
- `markCompleted($jobId, $result)` / `markFailed($jobId, $error, $backoffSeconds)`
|
|
- `hasPendingJob($jobType, $payloadMatch)` — zapobiega duplikatom
|
|
- `cleanup($olderThanDays)` — GC starych wpisów
|
|
- `recoverStuck($olderThanMinutes)` — reset stuck "processing" jobs
|
|
- `getDueSchedules()` / `touchSchedule($id)` — harmonogram
|
|
|
|
### CronJobProcessor — orkiestracja
|
|
- `registerHandler($jobType, callable)` — rejestracja handlera
|
|
- `createScheduledJobs()` — tworzy jobs z harmonogramów których `next_run_at <= NOW`
|
|
- `processQueue($limit)` — pobierz + wywołaj handler + markCompleted/markFailed
|
|
- `run($limit)` — główna metoda: schedules + process
|
|
|
|
### Exponential backoff
|
|
```
|
|
Próba 1: 60s, Próba 2: 120s, Próba 3: 240s, ... max 3600s (1h)
|
|
```
|
|
|
|
### Zależność "order not yet in Apilo"
|
|
Handler `apilo_sync_payment`/`apilo_sync_status` sprawdza `apilo_order_id`. Jeśli brak → zwraca false → `markFailed()` z backoffem → zadanie wraca do kolejki. Max 50 prób.
|
|
|
|
## Nowy cron.php (schemat)
|
|
|
|
```php
|
|
$cronRepo = new \Domain\CronJob\CronJobRepository($mdb);
|
|
$processor = new \Domain\CronJob\CronJobProcessor($mdb, $cronRepo);
|
|
|
|
// Rejestracja handlerów (każdy to callable)
|
|
$processor->registerHandler('apilo_token_keepalive', function($payload) use ($integrationsRepo) { ... });
|
|
$processor->registerHandler('apilo_send_order', function($payload) use ($orderService, ...) { ... });
|
|
// ... inne handlery
|
|
|
|
$result = $processor->run(20);
|
|
```
|
|
|
|
## Zmiany w OrderAdminService
|
|
|
|
1. `queueApiloSync()` → `CronJobRepository::enqueue()` zamiast zapisu do pliku JSON
|
|
2. Usunięcie: `loadApiloSyncQueue()`, `saveApiloSyncQueue()`, `apiloSyncQueuePath()`, stała `APILO_SYNC_QUEUE_FILE`
|
|
3. `syncApiloPayment()`, `syncApiloStatus()` → zmiana z `private` na `public`
|
|
4. Jednorazowa migracja: odczyt JSON → insert do DB → usunięcie pliku
|
|
|
|
## Kolejność implementacji
|
|
|
|
1. Migracja SQL
|
|
2. `CronJobType.php`
|
|
3. `CronJobRepository.php` + testy
|
|
4. `CronJobProcessor.php` + testy
|
|
5. Modyfikacja `OrderAdminService` (queue → DB, public methods)
|
|
6. Jednorazowa migracja pliku JSON → DB
|
|
7. Nowy `cron.php` z handlerami (ekstrakcja logiki z bloków proceduralnych)
|
|
8. Aktualizacja testów OrderAdminService
|
|
9. Dokumentacja (DATABASE_STRUCTURE.md, CHANGELOG.md)
|
|
|
|
## Weryfikacja
|
|
|
|
1. Uruchomienie pełnego zestawu testów: `./test.ps1`
|
|
2. Sprawdzenie czy nowe testy CronJob* przechodzą
|
|
3. Sprawdzenie czy istniejące testy OrderAdminService przechodzą po refaktorze
|
|
4. Weryfikacja migracji SQL na pustej bazie
|