Async/src/FuzeWorks/Async/Supervisors/ParallelSuperVisor.php

286 lines
12 KiB
PHP

<?php
/**
* FuzeWorks Async Library
*
* The FuzeWorks PHP FrameWork
*
* Copyright (C) 2013-2020 TechFuze
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
* @author TechFuze
* @copyright Copyright (c) 2013 - 2020, TechFuze. (http://techfuze.net)
* @license https://opensource.org/licenses/MIT MIT License
*
* @link http://techfuze.net/fuzeworks
* @since Version 1.0.0
*
* @version Version 1.0.0
*/
namespace FuzeWorks\Async\Supervisors;
use FuzeWorks\Async\Executor;
use FuzeWorks\Async\SuperVisor;
use FuzeWorks\Async\Task;
use FuzeWorks\Async\TasksException;
use FuzeWorks\Async\TaskStorage;
class ParallelSuperVisor implements SuperVisor
{
/**
* @var TaskStorage
*/
protected $taskStorage;
/**
* @var Executor
*/
protected $executor;
/**
* @var Task[]
*/
protected $tasks = [];
public function __construct(TaskStorage $taskStorage, Executor $executor, array $options = [])
{
$this->taskStorage = $taskStorage;
$this->executor = $executor;
}
/**
* @inheritDoc
*/
public function cycle(): int
{
// First: if there are no tasks, load them
$this->tasks = $this->taskStorage->readTasks(true);
// If there are still no tasks, nothing is queued, so this cycle can end.
if (empty($this->tasks))
return SuperVisor::FINISHED;
for ($i=0;$i<count($this->tasks);$i++)
{
$task = $this->tasks[$i];
// PENDING: should start if not constrained
if ($task->getStatus() === Task::PENDING)
{
// Test if constrained
$task = $this->testConstraints($task);
// If the task changed status, task is no longer pending and should be processed by another statement
if ($task->getStatus() !== Task::PENDING)
continue;
// Start the process using the executor service
$task = $this->executor->startTask($task);
$task->setStatus(Task::RUNNING);
$task->startTaskTime();
// Modify the task in TaskStorage
$this->taskStorage->modifyTask($task);
fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()));
}
// DELAYED: If task is delayed, and enough time has passed, change the status back to pending
elseif ($task->getStatus() === Task::DELAYED && time() > $task->getDelayTime())
{
$task->setStatus(Task::PENDING);
$this->taskStorage->modifyTask($task);
fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()));
}
// RUNNING: check if task is still running. If not, set result based on output
elseif ($task->getStatus() === Task::RUNNING)
{
$isRunning = $this->executor->getTaskRunning($task);
$output = $this->taskStorage->readTaskOutput($task);
$hasOutput = !is_null($output);
// If nothing is found, the process has crashed and status PFAILED should be set
if (!$isRunning && !$hasOutput)
$task->setStatus(Task::PFAILED);
// @todo Set PFAILED after $max_Time
// If output is found, use the status code from that
elseif (!$isRunning && $hasOutput)
{
try {
$task->setOutput($output['output'], $output['errors']);
$task->setStatus($output['statusCode']);
} catch (TasksException $e) {
// On failure to set output, consider as a process failure
$task->setStatus(Task::PFAILED);
}
}
// In any other situation the process is still running and should be left alone
else
continue;
// If any changes have been made, they should be written to TaskStorage
$task->endTaskTime();
$this->taskStorage->modifyTask($task);
fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()));
}
// FAILED: if a process has failed, attempt to rety if requested to do so
elseif ($task->getStatus() === Task::PFAILED || $task->getStatus() === Task::FAILED)
{
// First fetch retry conditions
$settings = $task->getSettings();
// First test if any retries should be tried at all
if ($settings['retryOnFail'] === true && $task->getRetries() < $settings['maxRetries'])
{
// Then test if this type of failure should be retried and whether the mexRetries has been exceeded
if (
($task->getStatus() === Task::PFAILED && $settings['retryPFailures'] === true) ||
($task->getStatus() === Task::FAILED && $settings['retryRFailures'] === true)
)
{
// If eligible, reset task to pending
$task->addRetry();
$task = $this->executor->startTask($task);
$task->setStatus(Task::RUNNING);
$task->startTaskTime();
$this->taskStorage->modifyTask($task);
fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()));
continue;
}
}
// If the task is not eligible for a retry, either cancel it or move it to a postHandler
if ($task->getUsePostHandler() === true)
{
$task->resetRetries();
$task = $this->executor->startTask($task, true);
$task->startPostTime();
$task->setStatus(Task::POST);
}
else
$task->setStatus(Task::CANCELLED);
$this->taskStorage->modifyTask($task);
fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()));
}
// SUCCESS: if a task has succeeded, see if it needs a postHandler
elseif ($task->getStatus() === Task::SUCCESS)
{
if ($task->getUsePostHandler() === true)
{
$task->resetRetries();
$task = $this->executor->startTask($task, true);
$task->startPostTime();
$task->setStatus(Task::POST);
}
else
$task->setStatus(Task::COMPLETED);
$this->taskStorage->modifyTask($task);
fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()));
}
// POST: when a task is currently running in it's postHandler
elseif ($task->getStatus() === Task::POST)
{
$isRunning = $this->executor->getTaskRunning($task);
$output = $this->taskStorage->readPostOutput($task);
$hasOutput = !is_null($output);
// If a task is not running and has no output, an error has occurred
if (!$isRunning && !$hasOutput)
{
// Test if a retry should be attempted
$settings = $task->getSettings();
if ($settings['retryOnFail'] === true && $settings['retryPostFailures'] === true && $settings['maxRetries'] > $task->getRetries())
{
$task->addRetry();
$task = $this->executor->startTask($task, true);
}
elseif ($settings['retryOnFail'] === true && $settings['retryPostFailures'] === true && $settings['maxRetries'] <= $task->getRetries())
$task->setStatus(Task::CANCELLED);
else
$task->setStatus(Task::CANCELLED);
}
// @todo Retry after $max_Time
// If a task is not running and has output, set that output and mark as completed
elseif (!$isRunning && $hasOutput)
{
$task->setPostOutput($output['output'], $output['errors']);
if ($output['statusCode'] === Task::SUCCESS)
$task->setStatus(Task::COMPLETED);
else
$task->setStatus(Task::CANCELLED);
}
// If the task is still running, leave it be
else
continue;
// If any changes have been made, they should be written to TaskStorage
$task->endPostTime();
$this->taskStorage->modifyTask($task);
fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()));
}
}
// Check if all tasks are completed
$allCompleted = true;
$anyDelayed = false;
foreach ($this->tasks as $task)
{
if ($task->getStatus() !== Task::COMPLETED && $task->getStatus() !== Task::CANCELLED)
$allCompleted = false;
elseif ($task->getStatus() === Task::DELAYED)
$anyDelayed = true;
}
// If all are finished and none are delayed
if ($allCompleted && !$anyDelayed)
return SuperVisor::FINISHED;
if ($allCompleted && $anyDelayed)
return SuperVisor::CONSTRAINED;
else
return SuperVisor::RUNNING;
}
private function testConstraints(Task $task): Task
{
$constraints = $task->getConstraints();
foreach ($constraints as $constraint)
{
if ($constraint->intervene($task) && $constraint->blockCode() != 0)
{
$task->setStatus($constraint->blockCode());
if ($constraint->blockCode() === Task::DELAYED)
$task->setDelayTime($constraint->delayTime());
// Save changes to TaskStorage
$this->taskStorage->modifyTask($task);
fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()));
}
}
return $task;
}
}