Async/Tasks/src/Supervisors/ParallelSuperVisor.php

211 lines
7.3 KiB
PHP

<?php
/**
* FuzeWorks CLIComponent.
*
* The FuzeWorks PHP FrameWork
*
* Copyright (C) 2013-2019 TechFuze
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
* @author TechFuze
* @copyright Copyright (c) 2013 - 2019, TechFuze. (http://techfuze.net)
* @license https://opensource.org/licenses/MIT MIT License
*
* @link http://techfuze.net/fuzeworks
* @since Version 1.2.0
*
* @version Version 1.2.0
*/
namespace Application\Library\Tasks\Supervisors;
use Application\Library\Tasks\Executor;
use Application\Library\Tasks\SuperVisor;
use Application\Library\Tasks\Task;
use Application\Library\Tasks\TasksException;
use Application\Library\Tasks\TaskStorage;
class ParallelSuperVisor implements SuperVisor
{
/**
* @var TaskStorage
*/
protected $taskStorage;
/**
* @var Executor
*/
protected $executor;
/**
* @var Task[]
*/
protected $tasks = [];
public function __construct(TaskStorage $taskStorage, Executor $executor, array $options = [])
{
$this->taskStorage = $taskStorage;
$this->executor = $executor;
}
/**
* @inheritDoc
*/
public function cycle(): int
{
// First: if there are no tasks, load them
$this->taskStorage->refreshTasks();
$this->tasks = $this->taskStorage->readTasks();
// If there are still no tasks, nothing is queued, so this cycle can end.
if (empty($this->tasks))
return SuperVisor::FINISHED;
// Check if all tasks are completed
$allCompleted = true;
foreach ($this->tasks as $task)
if ($task->getStatus() !== Task::COMPLETED)
$allCompleted = false;
if ($allCompleted)
return SuperVisor::FINISHED;
for ($i=0;$i<count($this->tasks);$i++)
{
$task = $this->tasks[$i];
dump(date('H:i:s') . ': ' . $task->getId() .'/'.$i . ": state #" . $task->getStatus());
// DELAYED Tasks. If a task is delayed, but the time has passed, mark the task as pending
if ($task->getStatus() === Task::DELAYED && time() < $task->getDelayTime())
$task->setStatus(Task::PENDING);
// FAILED Tasks, check if failed can be made pending using constraints
elseif ($task->getStatus() === Task::FAILED)
$task = $this->testConstraints($task);
// FINISHED/CANCELLED/FAILED Tasks, check if they are old enough for removal
elseif ($task->getStatus() === Task::COMPLETED || $task->getStatus() === Task::CANCELLED || $task->getStatus() === Task::FAILED)
{
}
// If the task is pending, start if not constrained
elseif ($task->getStatus() === Task::PENDING)
{
// First test for constraints.
$task = $this->testConstraints($task);
// If the task is still PENDING, execute it
if ($task->getStatus() !== Task::PENDING)
continue;
// START THE PROCESS
$task = $this->executor->startTask($task);
$this->taskStorage->modifyTask($task);
}
// Check if tasks are still running
elseif ($task->getStatus() === Task::RUNNING)
{
$isRunning = $this->executor->getTaskRunning($task);
$output = $this->taskStorage->readTaskOutput($task);
$hasOutput = !is_null($output);
if (!$isRunning && !$hasOutput)
$task->setStatus(Task::FAILED);
elseif (!$isRunning && $hasOutput)
{
try {
$task->setOutput($output['output'], $output['errors']);
$task->setStatus($output['statusCode']);
} catch (TasksException $e) {
// Ignore
}
}
// elseif (EXPIRED)
else
continue;
$this->taskStorage->modifyTask($task);
}
// Check if task succeeded, and whether the task must progress to POST or COMPLETED
// @todo Add failed handler
elseif ($task->getStatus() === Task::SUCCESS)
{
// Calling post handler when requested to do so
if ($task->getCallPostHandler() === true || ($task->getCallPostHandlerWhenFailed() === true && $output['statusCode'] === Task::FAILED))
$task = $this->executor->startTask($task, true);
else
$task->setStatus(Task::COMPLETED);
$this->taskStorage->modifyTask($task);
}
// If the task is in post mode, check when that process has ended
elseif ($task->getStatus() === Task::POST)
{
$isRunning = $this->executor->getTaskRunning($task);
$output = $this->taskStorage->readTaskOutput($task);
$hasOutput = isset($output['postStatus']);
if (!$isRunning && !$hasOutput)
$task->setStatus(Task::FAILED);
elseif (!$isRunning && $hasOutput)
{
try {
$task->setPostOutput($output['postOutput'], $output['postErrors']);
$task->setStatus(Task::COMPLETED);
} catch (TasksException $e) {
// Ignore
}
}
else
continue;
$this->taskStorage->modifyTask($task);
}
}
return SuperVisor::RUNNING;
}
private function testConstraints(Task $task): Task
{
$constraints = $task->getConstraints();
foreach ($constraints as $constraint)
{
if ($constraint->intervene($task) && $constraint->blockCode() != 0)
{
$task->setStatus($constraint->blockCode());
if ($constraint->blockCode() === Task::DELAYED)
$task->setDelayTime(time() + $constraint->delayTime());
// Save changes to TaskStorage
$this->taskStorage->modifyTask($task);
}
}
return $task;
}
}