Compare commits
5 Commits
master
...
finetuning
Author | SHA1 | Date | |
---|---|---|---|
5d306dac51 | |||
80df1dd33e | |||
fcda6d6e4d | |||
be3d5d56e8 | |||
4758f4154b |
@ -109,12 +109,17 @@ try {
|
|||||||
// And finally, run the supervisor
|
// And finally, run the supervisor
|
||||||
try {
|
try {
|
||||||
$supervisor = $lib->getSuperVisor($bootstrap);
|
$supervisor = $lib->getSuperVisor($bootstrap);
|
||||||
while ($supervisor->cycle() !== SuperVisor::RUNNING) {
|
$res = SuperVisor::RUNNING;
|
||||||
|
while ($res === SuperVisor::RUNNING) {
|
||||||
|
$res = $supervisor->cycle();
|
||||||
usleep(250000);
|
usleep(250000);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write results
|
// Write results
|
||||||
fwrite(STDOUT, "SuperVisor finished scheduled tasks.");
|
if ($res === SuperVisor::CONSTRAINED)
|
||||||
|
fwrite(STDOUT, "SuperVisor finished due to constrained tasks." . PHP_EOL);
|
||||||
|
else
|
||||||
|
fwrite(STDOUT, "SuperVisor finished due to finishing all tasks." . PHP_EOL);
|
||||||
} catch (InvalidArgumentException | TasksException | LibraryException $e) {
|
} catch (InvalidArgumentException | TasksException | LibraryException $e) {
|
||||||
fwrite(STDERR, sprintf('FuzeWorks Async could not load.' . PHP_EOL .
|
fwrite(STDERR, sprintf('FuzeWorks Async could not load.' . PHP_EOL .
|
||||||
'Exception: ' . $e->getMessage() . PHP_EOL)
|
'Exception: ' . $e->getMessage() . PHP_EOL)
|
||||||
|
@ -44,9 +44,10 @@ interface Constraint
|
|||||||
* Return true ONLY when this constraint changes the execution of the Task. Otherwise return false.
|
* Return true ONLY when this constraint changes the execution of the Task. Otherwise return false.
|
||||||
*
|
*
|
||||||
* @param Task $task
|
* @param Task $task
|
||||||
|
* @param Task[] $tasks
|
||||||
* @return bool
|
* @return bool
|
||||||
*/
|
*/
|
||||||
public function intervene(Task $task): bool;
|
public function intervene(Task $task, array $tasks): bool;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* When intervene() returns true, this method should return the new status of the task.
|
* When intervene() returns true, this method should return the new status of the task.
|
||||||
|
@ -84,12 +84,12 @@ class DependencyConstraint implements Constraint
|
|||||||
/**
|
/**
|
||||||
* @inheritDoc
|
* @inheritDoc
|
||||||
*/
|
*/
|
||||||
public function intervene(Task $task): bool
|
public function intervene(Task $task, array $tasks): bool
|
||||||
{
|
{
|
||||||
// Fetch taskStorage
|
// Fetch taskStorage
|
||||||
try {
|
try {
|
||||||
$tasks = $this->loadTasksLib();
|
$tasksLib = $this->loadTasksLib();
|
||||||
$taskStorage = $tasks->getTaskStorage();
|
$taskStorage = $tasksLib->getTaskStorage();
|
||||||
|
|
||||||
// Is any dependency unresolved?
|
// Is any dependency unresolved?
|
||||||
$hasUnresolved = false;
|
$hasUnresolved = false;
|
||||||
|
@ -70,7 +70,7 @@ class FixedTimeConstraint implements Constraint
|
|||||||
/**
|
/**
|
||||||
* @inheritDoc
|
* @inheritDoc
|
||||||
*/
|
*/
|
||||||
public function intervene(Task $task): bool
|
public function intervene(Task $task, array $tasks): bool
|
||||||
{
|
{
|
||||||
if ($task->getStatus() === Task::PENDING && time() < $this->timestamp)
|
if ($task->getStatus() === Task::PENDING && time() < $this->timestamp)
|
||||||
return true;
|
return true;
|
||||||
|
124
src/FuzeWorks/Async/Constraint/GroupConstraint.php
Normal file
124
src/FuzeWorks/Async/Constraint/GroupConstraint.php
Normal file
@ -0,0 +1,124 @@
|
|||||||
|
<?php
|
||||||
|
/**
|
||||||
|
* FuzeWorks Component.
|
||||||
|
*
|
||||||
|
* The FuzeWorks PHP FrameWork
|
||||||
|
*
|
||||||
|
* Copyright (C) 2013-2019 TechFuze
|
||||||
|
*
|
||||||
|
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
* of this software and associated documentation files (the "Software"), to deal
|
||||||
|
* in the Software without restriction, including without limitation the rights
|
||||||
|
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
* copies of the Software, and to permit persons to whom the Software is
|
||||||
|
* furnished to do so, subject to the following conditions:
|
||||||
|
*
|
||||||
|
* The above copyright notice and this permission notice shall be included in all
|
||||||
|
* copies or substantial portions of the Software.
|
||||||
|
*
|
||||||
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
|
* SOFTWARE.
|
||||||
|
*
|
||||||
|
* @author TechFuze
|
||||||
|
* @copyright Copyright (c) 2013 - 2019, TechFuze. (http://techfuze.net)
|
||||||
|
* @license https://opensource.org/licenses/MIT MIT License
|
||||||
|
*
|
||||||
|
* @link http://techfuze.net/fuzeworks
|
||||||
|
* @since Version 1.2.0
|
||||||
|
*
|
||||||
|
* @version Version 1.2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace FuzeWorks\Async\Constraint;
|
||||||
|
use FuzeWorks\Async\Constraint;
|
||||||
|
use FuzeWorks\Async\Task;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class GroupConstraint
|
||||||
|
*
|
||||||
|
* Use this constraint to maximize the amount of concurrent tasks within a certain group.
|
||||||
|
* If the amount of running tasks equals the maximum concurrent tasks, this constraint will issue a delay.
|
||||||
|
* Useful when multiple tasks are not dependent on each other, but should be run separately.
|
||||||
|
*
|
||||||
|
* @package FuzeWorks\Async\Constraint
|
||||||
|
*/
|
||||||
|
class GroupConstraint implements Constraint
|
||||||
|
{
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @var string
|
||||||
|
*/
|
||||||
|
protected $groupName;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @var int
|
||||||
|
*/
|
||||||
|
protected $maxConcurrent = 3;
|
||||||
|
|
||||||
|
protected $returnStatus = Task::DELAYED;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* GroupConstraint constructor.
|
||||||
|
*
|
||||||
|
* To add a group, simply provide the $groupName and the maximum of concurrent tasks within this group.
|
||||||
|
*
|
||||||
|
* @param string $groupName
|
||||||
|
* @param int $maxConcurrent
|
||||||
|
*/
|
||||||
|
public function __construct(string $groupName, int $maxConcurrent = 3)
|
||||||
|
{
|
||||||
|
$this->groupName = $groupName;
|
||||||
|
$this->maxConcurrent = $maxConcurrent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function init(Task $task)
|
||||||
|
{
|
||||||
|
$task->addAttribute('groupName', $this->groupName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getGroupName(): string
|
||||||
|
{
|
||||||
|
return $this->groupName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function intervene(Task $task, array $tasks): bool
|
||||||
|
{
|
||||||
|
// Find all tasks within this group and that are running
|
||||||
|
$runningTasks = 0;
|
||||||
|
foreach ($tasks as $t)
|
||||||
|
{
|
||||||
|
foreach ($t->getConstraints() as $constraint)
|
||||||
|
{
|
||||||
|
if ($constraint instanceof GroupConstraint && $this->groupName === $constraint->getGroupName())
|
||||||
|
{
|
||||||
|
if ($t->getStatus() === Task::RUNNING || $t->getStatus() === Task::POST)
|
||||||
|
$runningTasks++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the running tasks exceeds the maximum, block this task from running
|
||||||
|
if ($runningTasks >= $this->maxConcurrent)
|
||||||
|
{
|
||||||
|
$this->returnStatus = Task::DELAYED;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function blockCode(): int
|
||||||
|
{
|
||||||
|
return $this->returnStatus;
|
||||||
|
}
|
||||||
|
|
||||||
|
public function delayTime(): int
|
||||||
|
{
|
||||||
|
return time() + 1;
|
||||||
|
}
|
||||||
|
}
|
133
src/FuzeWorks/Async/Constraint/GroupDependencyConstraint.php
Normal file
133
src/FuzeWorks/Async/Constraint/GroupDependencyConstraint.php
Normal file
@ -0,0 +1,133 @@
|
|||||||
|
<?php
|
||||||
|
/**
|
||||||
|
* FuzeWorks Component.
|
||||||
|
*
|
||||||
|
* The FuzeWorks PHP FrameWork
|
||||||
|
*
|
||||||
|
* Copyright (C) 2013-2019 TechFuze
|
||||||
|
*
|
||||||
|
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
* of this software and associated documentation files (the "Software"), to deal
|
||||||
|
* in the Software without restriction, including without limitation the rights
|
||||||
|
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
* copies of the Software, and to permit persons to whom the Software is
|
||||||
|
* furnished to do so, subject to the following conditions:
|
||||||
|
*
|
||||||
|
* The above copyright notice and this permission notice shall be included in all
|
||||||
|
* copies or substantial portions of the Software.
|
||||||
|
*
|
||||||
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
|
* SOFTWARE.
|
||||||
|
*
|
||||||
|
* @author TechFuze
|
||||||
|
* @copyright Copyright (c) 2013 - 2019, TechFuze. (http://techfuze.net)
|
||||||
|
* @license https://opensource.org/licenses/MIT MIT License
|
||||||
|
*
|
||||||
|
* @link http://techfuze.net/fuzeworks
|
||||||
|
* @since Version 1.2.0
|
||||||
|
*
|
||||||
|
* @version Version 1.2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
namespace FuzeWorks\Async\Constraint;
|
||||||
|
use FuzeWorks\Async\Constraint;
|
||||||
|
use FuzeWorks\Async\Task;
|
||||||
|
use FuzeWorks\Async\Tasks;
|
||||||
|
use FuzeWorks\Async\TasksException;
|
||||||
|
use FuzeWorks\Exception\FactoryException;
|
||||||
|
use FuzeWorks\Exception\LibraryException;
|
||||||
|
use FuzeWorks\Factory;
|
||||||
|
use FuzeWorks\Libraries;
|
||||||
|
|
||||||
|
class GroupDependencyConstraint implements Constraint
|
||||||
|
{
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @var string
|
||||||
|
*/
|
||||||
|
protected $groupName;
|
||||||
|
|
||||||
|
protected $returnStatus = Task::DELAYED;
|
||||||
|
|
||||||
|
public function __construct(string $groupName)
|
||||||
|
{
|
||||||
|
$this->groupName = $groupName;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @inheritDoc
|
||||||
|
*/
|
||||||
|
public function intervene(Task $task, array $tasks): bool
|
||||||
|
{
|
||||||
|
// Find whether any task is not completed
|
||||||
|
foreach ($tasks as $t)
|
||||||
|
{
|
||||||
|
foreach ($t->getConstraints() as $constraint)
|
||||||
|
{
|
||||||
|
// Check whether the constraint is a GroupConstraint
|
||||||
|
if ($constraint instanceof GroupConstraint && $this->groupName === $constraint->getGroupName())
|
||||||
|
{
|
||||||
|
$this->returnStatus = Task::DELAYED;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// After all known tasks have run, check if all have ended without a CANCELLED status
|
||||||
|
$taskStorage = $this->loadTasksLib()->getTaskStorage();
|
||||||
|
$tasks = $taskStorage->getTasksByAttribute('groupName', $this->groupName);
|
||||||
|
foreach ($tasks as $t)
|
||||||
|
{
|
||||||
|
if ($t->getStatus() === Task::CANCELLED)
|
||||||
|
{
|
||||||
|
$this->returnStatus = Task::CANCELLED;
|
||||||
|
$task->setOutput('', 'Task cancelled due to failed group dependency.');
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @inheritDoc
|
||||||
|
*/
|
||||||
|
public function blockCode(): int
|
||||||
|
{
|
||||||
|
return $this->returnStatus;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @inheritDoc
|
||||||
|
*/
|
||||||
|
public function delayTime(): int
|
||||||
|
{
|
||||||
|
return time() + 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Load the tasks library, so that dependencies can get scanned later
|
||||||
|
*
|
||||||
|
* @return Tasks
|
||||||
|
* @throws TasksException
|
||||||
|
*/
|
||||||
|
private function loadTasksLib(): Tasks
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
/** @var Libraries $libraries */
|
||||||
|
$libraries = Factory::getInstance('libraries');
|
||||||
|
|
||||||
|
/** @var Tasks $tasks */
|
||||||
|
$tasks = $libraries->get('async');
|
||||||
|
|
||||||
|
return $tasks;
|
||||||
|
} catch (FactoryException | LibraryException $e) {
|
||||||
|
throw new TasksException("Could not constrain task. Async library could not be loaded.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -59,13 +59,23 @@ class ShellExecutor implements Executor
|
|||||||
if (!isset($parameters['workerFile']) || !isset($parameters['bootstrapFile']))
|
if (!isset($parameters['workerFile']) || !isset($parameters['bootstrapFile']))
|
||||||
throw new TasksException("Could not construct ShellExecutor. Parameter failure.");
|
throw new TasksException("Could not construct ShellExecutor. Parameter failure.");
|
||||||
|
|
||||||
|
// Determine alternative worker file
|
||||||
|
$alternative = dirname(__DIR__, 4) . DIRECTORY_SEPARATOR . 'bin' . DIRECTORY_SEPARATOR . 'worker';
|
||||||
|
|
||||||
// Fetch workerFile
|
// Fetch workerFile
|
||||||
$this->worker = $parameters['workerFile'];
|
$workerFile = null;
|
||||||
if (!file_exists($this->worker))
|
if (file_exists($parameters['workerFile']))
|
||||||
|
$workerFile = $parameters['workerFile'];
|
||||||
|
elseif (file_exists($alternative))
|
||||||
|
$workerFile = $alternative;
|
||||||
|
|
||||||
|
// If no workerFile is found, throw an exception
|
||||||
|
if (is_null($workerFile))
|
||||||
throw new TasksException("Could not construct ShellExecutor. ShellWorker script does not exist.");
|
throw new TasksException("Could not construct ShellExecutor. ShellWorker script does not exist.");
|
||||||
|
|
||||||
// First determine the PHP binary
|
// First determine the PHP binary
|
||||||
$this->binary = PHP_BINDIR . DS . 'php';
|
$this->binary = PHP_BINDIR . DS . 'php';
|
||||||
|
$this->worker = $workerFile;
|
||||||
$this->bootstrapFile = $parameters['bootstrapFile'];
|
$this->bootstrapFile = $parameters['bootstrapFile'];
|
||||||
if (!file_exists($this->bootstrapFile))
|
if (!file_exists($this->bootstrapFile))
|
||||||
throw new TasksException("Could not construct ShellExecutor. No bootstrap file found.");
|
throw new TasksException("Could not construct ShellExecutor. No bootstrap file found.");
|
||||||
|
@ -149,6 +149,9 @@ class ControllerHandler implements Handler
|
|||||||
*/
|
*/
|
||||||
public function getOutput(): string
|
public function getOutput(): string
|
||||||
{
|
{
|
||||||
|
if (!is_string($this->output))
|
||||||
|
return '';
|
||||||
|
|
||||||
return $this->output;
|
return $this->output;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -189,6 +192,9 @@ class ControllerHandler implements Handler
|
|||||||
*/
|
*/
|
||||||
public function getPostOutput(): string
|
public function getPostOutput(): string
|
||||||
{
|
{
|
||||||
|
if (!is_string($this->postOutput))
|
||||||
|
return '';
|
||||||
|
|
||||||
return $this->postOutput;
|
return $this->postOutput;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,6 +62,7 @@ class ParallelSuperVisor implements SuperVisor
|
|||||||
{
|
{
|
||||||
$this->taskStorage = $taskStorage;
|
$this->taskStorage = $taskStorage;
|
||||||
$this->executor = $executor;
|
$this->executor = $executor;
|
||||||
|
gc_enable();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -88,7 +89,10 @@ class ParallelSuperVisor implements SuperVisor
|
|||||||
|
|
||||||
// If the task changed status, task is no longer pending and should be processed by another statement
|
// If the task changed status, task is no longer pending and should be processed by another statement
|
||||||
if ($task->getStatus() !== Task::PENDING)
|
if ($task->getStatus() !== Task::PENDING)
|
||||||
|
{
|
||||||
|
fwrite(STDOUT, "Changed status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()) . PHP_EOL);
|
||||||
continue;
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// Start the process using the executor service
|
// Start the process using the executor service
|
||||||
$task = $this->executor->startTask($task);
|
$task = $this->executor->startTask($task);
|
||||||
@ -97,15 +101,16 @@ class ParallelSuperVisor implements SuperVisor
|
|||||||
|
|
||||||
// Modify the task in TaskStorage
|
// Modify the task in TaskStorage
|
||||||
$this->taskStorage->modifyTask($task);
|
$this->taskStorage->modifyTask($task);
|
||||||
fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()));
|
fwrite(STDOUT, "Changed status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()) . PHP_EOL);
|
||||||
}
|
}
|
||||||
|
|
||||||
// DELAYED: If task is delayed, and enough time has passed, change the status back to pending
|
// DELAYED: If task is delayed, and enough time has passed, change the status back to pending
|
||||||
|
// @todo Don't set to pending if still constrained
|
||||||
elseif ($task->getStatus() === Task::DELAYED && time() > $task->getDelayTime())
|
elseif ($task->getStatus() === Task::DELAYED && time() > $task->getDelayTime())
|
||||||
{
|
{
|
||||||
$task->setStatus(Task::PENDING);
|
$task->setStatus(Task::PENDING);
|
||||||
$this->taskStorage->modifyTask($task);
|
$this->taskStorage->modifyTask($task);
|
||||||
fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()));
|
fwrite(STDOUT, "Changed status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()) . PHP_EOL);
|
||||||
}
|
}
|
||||||
|
|
||||||
// RUNNING: check if task is still running. If not, set result based on output
|
// RUNNING: check if task is still running. If not, set result based on output
|
||||||
@ -137,7 +142,7 @@ class ParallelSuperVisor implements SuperVisor
|
|||||||
// If any changes have been made, they should be written to TaskStorage
|
// If any changes have been made, they should be written to TaskStorage
|
||||||
$task->endTaskTime();
|
$task->endTaskTime();
|
||||||
$this->taskStorage->modifyTask($task);
|
$this->taskStorage->modifyTask($task);
|
||||||
fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()));
|
fwrite(STDOUT, "Changed status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()) . PHP_EOL);
|
||||||
}
|
}
|
||||||
|
|
||||||
// FAILED: if a process has failed, attempt to rety if requested to do so
|
// FAILED: if a process has failed, attempt to rety if requested to do so
|
||||||
@ -161,7 +166,7 @@ class ParallelSuperVisor implements SuperVisor
|
|||||||
$task->setStatus(Task::RUNNING);
|
$task->setStatus(Task::RUNNING);
|
||||||
$task->startTaskTime();
|
$task->startTaskTime();
|
||||||
$this->taskStorage->modifyTask($task);
|
$this->taskStorage->modifyTask($task);
|
||||||
fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()));
|
fwrite(STDOUT, "Changed status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()) . PHP_EOL);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -178,7 +183,7 @@ class ParallelSuperVisor implements SuperVisor
|
|||||||
$task->setStatus(Task::CANCELLED);
|
$task->setStatus(Task::CANCELLED);
|
||||||
|
|
||||||
$this->taskStorage->modifyTask($task);
|
$this->taskStorage->modifyTask($task);
|
||||||
fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()));
|
fwrite(STDOUT, "Changed status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()) . PHP_EOL);
|
||||||
}
|
}
|
||||||
|
|
||||||
// SUCCESS: if a task has succeeded, see if it needs a postHandler
|
// SUCCESS: if a task has succeeded, see if it needs a postHandler
|
||||||
@ -195,7 +200,7 @@ class ParallelSuperVisor implements SuperVisor
|
|||||||
$task->setStatus(Task::COMPLETED);
|
$task->setStatus(Task::COMPLETED);
|
||||||
|
|
||||||
$this->taskStorage->modifyTask($task);
|
$this->taskStorage->modifyTask($task);
|
||||||
fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()));
|
fwrite(STDOUT, "Changed status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()) . PHP_EOL);
|
||||||
}
|
}
|
||||||
|
|
||||||
// POST: when a task is currently running in it's postHandler
|
// POST: when a task is currently running in it's postHandler
|
||||||
@ -237,37 +242,43 @@ class ParallelSuperVisor implements SuperVisor
|
|||||||
// If any changes have been made, they should be written to TaskStorage
|
// If any changes have been made, they should be written to TaskStorage
|
||||||
$task->endPostTime();
|
$task->endPostTime();
|
||||||
$this->taskStorage->modifyTask($task);
|
$this->taskStorage->modifyTask($task);
|
||||||
fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()));
|
fwrite(STDOUT, "Changed status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()) . PHP_EOL);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if all tasks are completed
|
// Check if all tasks are completed
|
||||||
$allCompleted = true;
|
|
||||||
$anyDelayed = false;
|
$anyDelayed = false;
|
||||||
|
$anyOther = false;
|
||||||
foreach ($this->tasks as $task)
|
foreach ($this->tasks as $task)
|
||||||
{
|
{
|
||||||
if ($task->getStatus() !== Task::COMPLETED && $task->getStatus() !== Task::CANCELLED)
|
if ($task->getStatus() === Task::DELAYED)
|
||||||
$allCompleted = false;
|
|
||||||
elseif ($task->getStatus() === Task::DELAYED)
|
|
||||||
$anyDelayed = true;
|
$anyDelayed = true;
|
||||||
|
elseif ($task->getStatus() !== Task::COMPLETED && $task->getStatus() !== Task::CANCELLED)
|
||||||
|
$anyOther = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If all are finished and none are delayed
|
// If all are finished and none are delayed
|
||||||
if ($allCompleted && !$anyDelayed)
|
if (!$anyOther && !$anyDelayed)
|
||||||
return SuperVisor::FINISHED;
|
return SuperVisor::FINISHED;
|
||||||
if ($allCompleted && $anyDelayed)
|
elseif (!$anyOther && $anyDelayed)
|
||||||
return SuperVisor::CONSTRAINED;
|
return SuperVisor::CONSTRAINED;
|
||||||
else
|
else
|
||||||
return SuperVisor::RUNNING;
|
return SuperVisor::RUNNING;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @todo Here it saves the constraint code regardless of whether there is a change. Fix!
|
||||||
|
*
|
||||||
|
* @param Task $task
|
||||||
|
* @return Task
|
||||||
|
* @throws TasksException
|
||||||
|
*/
|
||||||
private function testConstraints(Task $task): Task
|
private function testConstraints(Task $task): Task
|
||||||
{
|
{
|
||||||
$constraints = $task->getConstraints();
|
$constraints = $task->getConstraints();
|
||||||
foreach ($constraints as $constraint)
|
foreach ($constraints as $constraint)
|
||||||
{
|
{
|
||||||
if ($constraint->intervene($task) && $constraint->blockCode() != 0)
|
if ($constraint->intervene($task, $this->tasks) && $constraint->blockCode() != 0)
|
||||||
{
|
{
|
||||||
$task->setStatus($constraint->blockCode());
|
$task->setStatus($constraint->blockCode());
|
||||||
if ($constraint->blockCode() === Task::DELAYED)
|
if ($constraint->blockCode() === Task::DELAYED)
|
||||||
@ -275,7 +286,6 @@ class ParallelSuperVisor implements SuperVisor
|
|||||||
|
|
||||||
// Save changes to TaskStorage
|
// Save changes to TaskStorage
|
||||||
$this->taskStorage->modifyTask($task);
|
$this->taskStorage->modifyTask($task);
|
||||||
fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -262,6 +262,9 @@ class Task
|
|||||||
*/
|
*/
|
||||||
public function addConstraint(Constraint $constraint)
|
public function addConstraint(Constraint $constraint)
|
||||||
{
|
{
|
||||||
|
if (method_exists($constraint, 'init'))
|
||||||
|
$constraint->init($this);
|
||||||
|
|
||||||
$this->constraints[] = $constraint;
|
$this->constraints[] = $constraint;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -319,6 +322,16 @@ class Task
|
|||||||
|
|
||||||
/* ---------------------------------- Attributes setters and getters ------------------ */
|
/* ---------------------------------- Attributes setters and getters ------------------ */
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve every attribute of this Task
|
||||||
|
*
|
||||||
|
* @return array
|
||||||
|
*/
|
||||||
|
public function getAttributes(): array
|
||||||
|
{
|
||||||
|
return $this->attributes;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetch an attribute of this task
|
* Fetch an attribute of this task
|
||||||
*
|
*
|
||||||
|
@ -74,6 +74,15 @@ interface TaskStorage
|
|||||||
*/
|
*/
|
||||||
public function getTaskById(string $identifier): Task;
|
public function getTaskById(string $identifier): Task;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve a list of all tasks that have a certain attribute
|
||||||
|
*
|
||||||
|
* @param string $attributeKey
|
||||||
|
* @param string $attributeValue
|
||||||
|
* @return Task[]
|
||||||
|
*/
|
||||||
|
public function getTasksByAttribute(string $attributeKey, string $attributeValue): array;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Modifies a task
|
* Modifies a task
|
||||||
*
|
*
|
||||||
|
@ -54,6 +54,8 @@ class RedisTaskStorage implements TaskStorage
|
|||||||
|
|
||||||
protected $indexSet = 'async_index';
|
protected $indexSet = 'async_index';
|
||||||
protected $unfinishedSet = 'async_index_unfinished';
|
protected $unfinishedSet = 'async_index_unfinished';
|
||||||
|
protected $attributesSet = 'async_attributes';
|
||||||
|
protected $attributesSetPrefix = 'async_attributes_';
|
||||||
protected $key_prefix = 'async_task_';
|
protected $key_prefix = 'async_task_';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -115,6 +117,22 @@ class RedisTaskStorage implements TaskStorage
|
|||||||
if ($this->conn->hSet($this->key_prefix . $taskId, 'data', $taskData) === FALSE)
|
if ($this->conn->hSet($this->key_prefix . $taskId, 'data', $taskData) === FALSE)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
|
// And create the attributes sets
|
||||||
|
foreach ($task->getAttributes() as $key => $val)
|
||||||
|
{
|
||||||
|
// If the attribute value is not a string it can't be used as a searchable attribute
|
||||||
|
if (!is_string($val))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
// And finally add it to the set
|
||||||
|
$setKey = $this->attributesSetPrefix . $key . '_' . $val;
|
||||||
|
$this->conn->sAdd($setKey, $taskId);
|
||||||
|
|
||||||
|
// And add the attribute combo to the attributesSet
|
||||||
|
if (!$this->conn->sIsMember($this->attributesSet, $key . '_' . $val))
|
||||||
|
$this->conn->sAdd($this->attributesSet, $key . '_' . $val);
|
||||||
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -155,6 +173,22 @@ class RedisTaskStorage implements TaskStorage
|
|||||||
return $task;
|
return $task;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @inheritDoc
|
||||||
|
*/
|
||||||
|
public function getTasksByAttribute(string $attributeKey, string $attributeValue): array
|
||||||
|
{
|
||||||
|
// Fetch the taskList by the attribute
|
||||||
|
$setKey = $this->attributesSetPrefix . $attributeKey . '_' . $attributeValue;
|
||||||
|
$taskList = $this->conn->sMembers($setKey);
|
||||||
|
|
||||||
|
$tasks = [];
|
||||||
|
foreach ($taskList as $taskId)
|
||||||
|
$tasks[] = unserialize($this->conn->hGet($this->key_prefix . $taskId, 'data'));
|
||||||
|
|
||||||
|
return $tasks;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @inheritDoc
|
* @inheritDoc
|
||||||
*/
|
*/
|
||||||
@ -182,6 +216,29 @@ class RedisTaskStorage implements TaskStorage
|
|||||||
if ($this->conn->hSet($this->key_prefix . $taskId, 'data', $taskData) === FALSE)
|
if ($this->conn->hSet($this->key_prefix . $taskId, 'data', $taskData) === FALSE)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
|
// Delete all the attributes
|
||||||
|
$allLoggedAttributes = $this->conn->sMembers($this->attributesSet);
|
||||||
|
foreach ($allLoggedAttributes as $attribute)
|
||||||
|
{
|
||||||
|
if ($this->conn->sIsMember($this->attributesSetPrefix . $attribute, $taskId))
|
||||||
|
$this->conn->sRem($this->attributesSetPrefix . $attribute, $taskId);
|
||||||
|
}
|
||||||
|
|
||||||
|
// And write all the attributes
|
||||||
|
foreach ($task->getAttributes() as $key => $val)
|
||||||
|
{
|
||||||
|
// If the attribute value is not a string it can't be used as a searchable attribute
|
||||||
|
if (!is_string($val))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
// And finally add it to the set
|
||||||
|
$setKey = $this->attributesSetPrefix . $key . '_' . $val;
|
||||||
|
$this->conn->sAdd($setKey, $taskId);
|
||||||
|
|
||||||
|
// And add the attribute combo to the attributesSet
|
||||||
|
$this->conn->sAdd($this->attributesSet, $key . '_' . $val);
|
||||||
|
}
|
||||||
|
|
||||||
// Modify the unfinished set
|
// Modify the unfinished set
|
||||||
if ($this->conn->sIsMember($this->unfinishedSet, $taskId) && ($task->getStatus() === Task::COMPLETED || $task->getStatus() === Task::CANCELLED ))
|
if ($this->conn->sIsMember($this->unfinishedSet, $taskId) && ($task->getStatus() === Task::COMPLETED || $task->getStatus() === Task::CANCELLED ))
|
||||||
$this->conn->sRem($this->unfinishedSet, $taskId);
|
$this->conn->sRem($this->unfinishedSet, $taskId);
|
||||||
@ -212,6 +269,14 @@ class RedisTaskStorage implements TaskStorage
|
|||||||
if ($this->conn->sIsMember($this->unfinishedSet, $taskId))
|
if ($this->conn->sIsMember($this->unfinishedSet, $taskId))
|
||||||
$this->conn->sRem($this->unfinishedSet, $taskId);
|
$this->conn->sRem($this->unfinishedSet, $taskId);
|
||||||
|
|
||||||
|
// Delete all the attributes
|
||||||
|
$allLoggedAttributes = $this->conn->sMembers($this->attributesSet);
|
||||||
|
foreach ($allLoggedAttributes as $attribute)
|
||||||
|
{
|
||||||
|
if ($this->conn->sIsMember($this->attributesSetPrefix . $attribute, $taskId))
|
||||||
|
$this->conn->sRem($this->attributesSetPrefix . $attribute, $taskId);
|
||||||
|
}
|
||||||
|
|
||||||
// Delete the task itself
|
// Delete the task itself
|
||||||
if ($this->conn->del($this->key_prefix . $taskId) > 0)
|
if ($this->conn->del($this->key_prefix . $taskId) > 0)
|
||||||
return true;
|
return true;
|
||||||
|
Loading…
Reference in New Issue
Block a user