reader = $this->initializeReader(); } public function __destruct() { $this->close(); } public function started(): void { } public function finished(): void { } public function getFileDownloaderConfiguration(): array { return []; } public function getFiletypeExtension(): string { return $this->reader->getFiletypeExtension(); } public function getReader(): stTaskSchedulerImportReaderInterface { return $this->reader; } public function createFileDownloader(): stFileDownloader { $mimeType = sfMimeType::extension2MimeType($this->getFiletypeExtension()); $options = array_merge([ 'http_headers' => [ 'Accept: ' . ($mimeType == 'application/xml' ? $mimeType . ', text/xml' : $mimeType), 'Content-Type: ' . $mimeType, ], ], $this->getFileDownloaderConfiguration()); return new stFileDownloader($options); } final public function count(): int { return $this->reader->count(); } final public function setOffset(int $offset): void { $this->reader->setOffset($offset); } final public function getOffset(): int { return $this->reader->getOffset(); } final public function open(string $file): bool { return $this->reader->open($file); } final public function isValid(): bool { return $this->reader->isValid(); } /** * Zwraca rekord danych wstępnie załadowanych * * @param string $id Id * @param bool $create Tworzy nową instancję w przypadku braku * @return stTaskSchedulerImportPreloadedData */ final public function getPreloadedData(string $id, bool $create = true): ?stTaskSchedulerImportPreloadedData { if (count($this->preloadedData) > 20) { $this->preloadedData = []; } if (!isset($this->preloadedData[$id]) && !array_key_exists($id, $this->preloadedData)) { $data = stTaskSchedulerImportPreloadedDataPeer::retrieveByDataId($id, $this); if ($create && null === $data) { $data = new stTaskSchedulerImportPreloadedData(); $data->setDataId($id); $data->setTaskSchedulerImport($this); } $this->preloadedData[$id] = $data; } return $this->preloadedData[$id]; } final public function execute(): bool { $limit = $this->getLimit(); $ok = true; do { try { $ok = $this->reader->read(function(array $data) { $this->process($data); $this->clearStaticPools(); }); } catch (stTaskSchedulerImportLogException $e) { $this->getLogger()->log($e->getCode(), $e->getMessage(), $e->getMessageParameters()); } usleep($this->getDelay()); $limit--; } while ($limit > 0 && $ok); return $ok; } public function close(): void { if ($this->reader) { $this->reader->close(); } } protected function getCollection(array $data): array { return isset($data[0]) ? $data : [ 0 => $data, ]; } }