164 lines
3.9 KiB
PHP
164 lines
3.9 KiB
PHP
<?php
|
|
|
|
abstract class stTaskSchedulerImportAbstract extends stTaskSchedulerImportBaseAbstract implements stTaskSchedulerImportInterface
|
|
{
|
|
/**
|
|
* Undocumented variable
|
|
*
|
|
* @var stTaskSchedulerImportPreloadedData[]
|
|
*/
|
|
protected $preloadedData = [];
|
|
|
|
/**
|
|
*
|
|
* @var stTaskSchedulerImportReaderInterface
|
|
*/
|
|
private $reader;
|
|
|
|
public function __construct(stTask $task, stTaskSchedulerImportConfigurationInterface $configuration)
|
|
{
|
|
parent::__construct($task, $configuration);
|
|
|
|
$this->reader = $this->initializeReader();
|
|
}
|
|
|
|
public function __destruct()
|
|
{
|
|
$this->close();
|
|
}
|
|
|
|
public function started(): void
|
|
{
|
|
}
|
|
|
|
public function finished(): void
|
|
{
|
|
}
|
|
|
|
public function getFileDownloaderConfiguration(): array
|
|
{
|
|
return [];
|
|
}
|
|
|
|
public function getFiletypeExtension(): string
|
|
{
|
|
return $this->reader->getFiletypeExtension();
|
|
}
|
|
|
|
public function getReader(): stTaskSchedulerImportReaderInterface
|
|
{
|
|
return $this->reader;
|
|
}
|
|
|
|
public function createFileDownloader(): stFileDownloader
|
|
{
|
|
$mimeType = sfMimeType::extension2MimeType($this->getFiletypeExtension());
|
|
$options = array_merge([
|
|
'http_headers' => [
|
|
'Accept: ' . ($mimeType == 'application/xml' ? $mimeType . ', text/xml' : $mimeType),
|
|
'Content-Type: ' . $mimeType,
|
|
],
|
|
], $this->getFileDownloaderConfiguration());
|
|
|
|
return new stFileDownloader($options);
|
|
}
|
|
|
|
final public function count(): int
|
|
{
|
|
return $this->reader->count();
|
|
}
|
|
|
|
final public function setOffset(int $offset): void
|
|
{
|
|
$this->reader->setOffset($offset);
|
|
}
|
|
|
|
final public function getOffset(): int
|
|
{
|
|
return $this->reader->getOffset();
|
|
}
|
|
|
|
final public function open(string $file): bool
|
|
{
|
|
return $this->reader->open($file);
|
|
}
|
|
|
|
final public function isValid(): bool
|
|
{
|
|
return $this->reader->isValid();
|
|
}
|
|
|
|
/**
|
|
* Zwraca rekord danych wstępnie załadowanych
|
|
*
|
|
* @param string $id Id
|
|
* @param bool $create Tworzy nową instancję w przypadku braku
|
|
* @return stTaskSchedulerImportPreloadedData
|
|
*/
|
|
final public function getPreloadedData(string $id, bool $create = true): ?stTaskSchedulerImportPreloadedData
|
|
{
|
|
if (count($this->preloadedData) > 20)
|
|
{
|
|
$this->preloadedData = [];
|
|
}
|
|
|
|
if (!isset($this->preloadedData[$id]) && !array_key_exists($id, $this->preloadedData))
|
|
{
|
|
$data = stTaskSchedulerImportPreloadedDataPeer::retrieveByDataId($id, $this);
|
|
|
|
if ($create && null === $data)
|
|
{
|
|
$data = new stTaskSchedulerImportPreloadedData();
|
|
$data->setDataId($id);
|
|
$data->setTaskSchedulerImport($this);
|
|
}
|
|
|
|
$this->preloadedData[$id] = $data;
|
|
}
|
|
|
|
return $this->preloadedData[$id];
|
|
}
|
|
|
|
final public function execute(): bool
|
|
{
|
|
$limit = $this->getLimit();
|
|
$ok = true;
|
|
|
|
do
|
|
{
|
|
try
|
|
{
|
|
$ok = $this->reader->read(function(array $data) {
|
|
$this->process($data);
|
|
$this->clearStaticPools();
|
|
});
|
|
}
|
|
catch (stTaskSchedulerImportLogException $e)
|
|
{
|
|
$this->getLogger()->log($e->getCode(), $e->getMessage(), $e->getMessageParameters());
|
|
}
|
|
|
|
usleep($this->getDelay());
|
|
|
|
$limit--;
|
|
}
|
|
while ($limit > 0 && $ok);
|
|
|
|
return $ok;
|
|
}
|
|
|
|
public function close(): void
|
|
{
|
|
if ($this->reader)
|
|
{
|
|
$this->reader->close();
|
|
}
|
|
}
|
|
|
|
protected function getCollection(array $data): array
|
|
{
|
|
return isset($data[0]) ? $data : [
|
|
0 => $data,
|
|
];
|
|
}
|
|
} |