diff --git a/src/FuzeWorks/Async/Constraint.php b/src/FuzeWorks/Async/Constraint.php index 9b3f100..32c87a7 100644 --- a/src/FuzeWorks/Async/Constraint.php +++ b/src/FuzeWorks/Async/Constraint.php @@ -44,9 +44,10 @@ interface Constraint * Return true ONLY when this constraint changes the execution of the Task. Otherwise return false. * * @param Task $task + * @param Task[] $tasks * @return bool */ - public function intervene(Task $task): bool; + public function intervene(Task $task, array $tasks): bool; /** * When intervene() returns true, this method should return the new status of the task. diff --git a/src/FuzeWorks/Async/Constraint/DependencyConstraint.php b/src/FuzeWorks/Async/Constraint/DependencyConstraint.php index ff78586..3938833 100644 --- a/src/FuzeWorks/Async/Constraint/DependencyConstraint.php +++ b/src/FuzeWorks/Async/Constraint/DependencyConstraint.php @@ -84,12 +84,12 @@ class DependencyConstraint implements Constraint /** * @inheritDoc */ - public function intervene(Task $task): bool + public function intervene(Task $task, array $tasks): bool { // Fetch taskStorage try { - $tasks = $this->loadTasksLib(); - $taskStorage = $tasks->getTaskStorage(); + $tasksLib = $this->loadTasksLib(); + $taskStorage = $tasksLib->getTaskStorage(); // Is any dependency unresolved? $hasUnresolved = false; diff --git a/src/FuzeWorks/Async/Constraint/FixedTimeConstraint.php b/src/FuzeWorks/Async/Constraint/FixedTimeConstraint.php index eb17a0c..3ecd1b0 100644 --- a/src/FuzeWorks/Async/Constraint/FixedTimeConstraint.php +++ b/src/FuzeWorks/Async/Constraint/FixedTimeConstraint.php @@ -70,7 +70,7 @@ class FixedTimeConstraint implements Constraint /** * @inheritDoc */ - public function intervene(Task $task): bool + public function intervene(Task $task, array $tasks): bool { if ($task->getStatus() === Task::PENDING && time() < $this->timestamp) return true; diff --git a/src/FuzeWorks/Async/Constraint/GroupConstraint.php b/src/FuzeWorks/Async/Constraint/GroupConstraint.php new file mode 100644 index 0000000..86e781e --- /dev/null +++ b/src/FuzeWorks/Async/Constraint/GroupConstraint.php @@ -0,0 +1,119 @@ +groupName = $groupName; + $this->maxConcurrent = $maxConcurrent; + } + + public function getGroupName(): string + { + return $this->groupName; + } + + public function intervene(Task $task, array $tasks): bool + { + // Find all tasks within this group and that are running + $runningTasks = 0; + foreach ($tasks as $t) + { + foreach ($t->getConstraints() as $constraint) + { + if ($constraint instanceof GroupConstraint && $this->groupName === $constraint->getGroupName()) + { + if ($t->getStatus() === Task::RUNNING || $t->getStatus() === Task::POST) + $runningTasks++; + } + } + + // If the running tasks exceeds the maximum, block this task from running + if ($runningTasks >= $this->maxConcurrent) + { + $this->returnStatus = Task::DELAYED; + return true; + } + } + + return false; + } + + public function blockCode(): int + { + return $this->returnStatus; + } + + public function delayTime(): int + { + return time() + 1; + } +} \ No newline at end of file diff --git a/src/FuzeWorks/Async/Constraint/GroupDependencyConstraint.php b/src/FuzeWorks/Async/Constraint/GroupDependencyConstraint.php new file mode 100644 index 0000000..fe8ce55 --- /dev/null +++ b/src/FuzeWorks/Async/Constraint/GroupDependencyConstraint.php @@ -0,0 +1,118 @@ +groupName = $groupName; + } + + /** + * @inheritDoc + */ + public function intervene(Task $task, array $tasks): bool + { + // Find whether any task is not completed + $hasFailed = false; + $hasUnresolved = false; + foreach ($tasks as $t) + { + foreach ($t->getConstraints() as $constraint) + { + // Check whether the constraint is a GroupConstraint + if ($constraint instanceof GroupConstraint && $this->groupName === $constraint->getGroupName()) + { + // Check whether GroupConstraint has failed + if ($t->getStatus() === Task::CANCELLED) + $hasFailed = true; + // If the task is not cancelled and not completed, there are unresolved tasks + elseif ($t->getStatus() !== Task::COMPLETED) + $hasUnresolved = true; + } + } + + // If there are failed tasks, cancel this dependent + if ($hasFailed) + { + $this->returnStatus = Task::CANCELLED; + $task->setOutput('', 'Task cancelled due to failed group dependency.'); + return true; + } + elseif ($hasUnresolved) + { + $this->returnStatus = Task::DELAYED; + return true; + } + } + + // Return the taskList + //$task->setOutput(serialize($tasks), ''); + //$this->returnStatus = Task::CANCELLED; + //return true; + + return false; + } + + /** + * @inheritDoc + */ + public function blockCode(): int + { + return $this->returnStatus; + } + + /** + * @inheritDoc + */ + public function delayTime(): int + { + return time() + 3; + } +} \ No newline at end of file diff --git a/src/FuzeWorks/Async/Executors/ShellExecutor.php b/src/FuzeWorks/Async/Executors/ShellExecutor.php index 247adda..8b3eaa0 100644 --- a/src/FuzeWorks/Async/Executors/ShellExecutor.php +++ b/src/FuzeWorks/Async/Executors/ShellExecutor.php @@ -59,13 +59,23 @@ class ShellExecutor implements Executor if (!isset($parameters['workerFile']) || !isset($parameters['bootstrapFile'])) throw new TasksException("Could not construct ShellExecutor. Parameter failure."); + // Determine alternative worker file + $alternative = dirname(__DIR__, 4) . DIRECTORY_SEPARATOR . 'bin' . DIRECTORY_SEPARATOR . 'worker'; + // Fetch workerFile - $this->worker = $parameters['workerFile']; - if (!file_exists($this->worker)) + $workerFile = null; + if (file_exists($parameters['workerFile'])) + $workerFile = $parameters['workerFile']; + elseif (file_exists($alternative)) + $workerFile = $alternative; + + // If no workerFile is found, throw an exception + if (is_null($workerFile)) throw new TasksException("Could not construct ShellExecutor. ShellWorker script does not exist."); // First determine the PHP binary $this->binary = PHP_BINDIR . DS . 'php'; + $this->worker = $workerFile; $this->bootstrapFile = $parameters['bootstrapFile']; if (!file_exists($this->bootstrapFile)) throw new TasksException("Could not construct ShellExecutor. No bootstrap file found."); diff --git a/src/FuzeWorks/Async/Handler/ControllerHandler.php b/src/FuzeWorks/Async/Handler/ControllerHandler.php index 15f88d8..f6966cd 100644 --- a/src/FuzeWorks/Async/Handler/ControllerHandler.php +++ b/src/FuzeWorks/Async/Handler/ControllerHandler.php @@ -149,6 +149,9 @@ class ControllerHandler implements Handler */ public function getOutput(): string { + if (!is_string($this->output)) + return ''; + return $this->output; } @@ -189,6 +192,9 @@ class ControllerHandler implements Handler */ public function getPostOutput(): string { + if (!is_string($this->postOutput)) + return ''; + return $this->postOutput; } diff --git a/src/FuzeWorks/Async/Supervisors/ParallelSuperVisor.php b/src/FuzeWorks/Async/Supervisors/ParallelSuperVisor.php index ee6277a..5d6d85b 100644 --- a/src/FuzeWorks/Async/Supervisors/ParallelSuperVisor.php +++ b/src/FuzeWorks/Async/Supervisors/ParallelSuperVisor.php @@ -62,6 +62,7 @@ class ParallelSuperVisor implements SuperVisor { $this->taskStorage = $taskStorage; $this->executor = $executor; + gc_enable(); } /** @@ -88,7 +89,10 @@ class ParallelSuperVisor implements SuperVisor // If the task changed status, task is no longer pending and should be processed by another statement if ($task->getStatus() !== Task::PENDING) + { + fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus())); continue; + } // Start the process using the executor service $task = $this->executor->startTask($task); @@ -101,6 +105,7 @@ class ParallelSuperVisor implements SuperVisor } // DELAYED: If task is delayed, and enough time has passed, change the status back to pending + // @todo Don't set to pending if still constrained elseif ($task->getStatus() === Task::DELAYED && time() > $task->getDelayTime()) { $task->setStatus(Task::PENDING); @@ -262,12 +267,19 @@ class ParallelSuperVisor implements SuperVisor return SuperVisor::RUNNING; } + /** + * @todo Here it saves the constraint code regardless of whether there is a change. Fix! + * + * @param Task $task + * @return Task + * @throws TasksException + */ private function testConstraints(Task $task): Task { $constraints = $task->getConstraints(); foreach ($constraints as $constraint) { - if ($constraint->intervene($task) && $constraint->blockCode() != 0) + if ($constraint->intervene($task, $this->tasks) && $constraint->blockCode() != 0) { $task->setStatus($constraint->blockCode()); if ($constraint->blockCode() === Task::DELAYED) @@ -275,7 +287,6 @@ class ParallelSuperVisor implements SuperVisor // Save changes to TaskStorage $this->taskStorage->modifyTask($task); - fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus())); } } diff --git a/src/FuzeWorks/Async/Task.php b/src/FuzeWorks/Async/Task.php index 2da3945..08c95a5 100644 --- a/src/FuzeWorks/Async/Task.php +++ b/src/FuzeWorks/Async/Task.php @@ -262,6 +262,9 @@ class Task */ public function addConstraint(Constraint $constraint) { + if (method_exists($constraint, 'init')) + $constraint->init($this); + $this->constraints[] = $constraint; } @@ -319,6 +322,16 @@ class Task /* ---------------------------------- Attributes setters and getters ------------------ */ + /** + * Retrieve every attribute of this Task + * + * @return array + */ + public function getAttributes(): array + { + return $this->attributes; + } + /** * Fetch an attribute of this task * diff --git a/src/FuzeWorks/Async/TaskStorage.php b/src/FuzeWorks/Async/TaskStorage.php index f0ffafe..ddf5ef8 100644 --- a/src/FuzeWorks/Async/TaskStorage.php +++ b/src/FuzeWorks/Async/TaskStorage.php @@ -74,6 +74,15 @@ interface TaskStorage */ public function getTaskById(string $identifier): Task; + /** + * Retrieve a list of all tasks that have a certain attribute + * + * @param string $attributeKey + * @param string $attributeValue + * @return array + */ + public function getTasksByAttribute(string $attributeKey, string $attributeValue): array; + /** * Modifies a task * diff --git a/src/FuzeWorks/Async/TaskStorage/RedisTaskStorage.php b/src/FuzeWorks/Async/TaskStorage/RedisTaskStorage.php index 1f6d3b3..352e597 100644 --- a/src/FuzeWorks/Async/TaskStorage/RedisTaskStorage.php +++ b/src/FuzeWorks/Async/TaskStorage/RedisTaskStorage.php @@ -54,6 +54,8 @@ class RedisTaskStorage implements TaskStorage protected $indexSet = 'async_index'; protected $unfinishedSet = 'async_index_unfinished'; + protected $attributesSet = 'async_attributes'; + protected $attributesSetPrefix = 'async_attributes_'; protected $key_prefix = 'async_task_'; /** @@ -115,6 +117,22 @@ class RedisTaskStorage implements TaskStorage if ($this->conn->hSet($this->key_prefix . $taskId, 'data', $taskData) === FALSE) return false; + // And create the attributes sets + foreach ($task->getAttributes() as $key => $val) + { + // If the attribute value is not a string it can't be used as a searchable attribute + if (!is_string($val)) + continue; + + // And finally add it to the set + $setKey = $this->attributesSetPrefix . $key . '_' . $val; + $this->conn->sAdd($setKey, $taskId); + + // And add the attribute combo to the attributesSet + if (!$this->conn->sIsMember($this->attributesSet, $key . '_' . $val)) + $this->conn->sAdd($this->attributesSet, $key . '_' . $val); + } + return true; } @@ -155,6 +173,22 @@ class RedisTaskStorage implements TaskStorage return $task; } + /** + * @inheritDoc + */ + public function getTasksByAttribute(string $attributeKey, string $attributeValue): array + { + // Fetch the taskList by the attribute + $setKey = $this->attributesSetPrefix . $attributeKey . '_' . $attributeValue; + $taskList = $this->conn->sMembers($setKey); + + $tasks = []; + foreach ($taskList as $taskId) + $tasks[] = unserialize($this->conn->hGet($this->key_prefix . $taskId, 'data')); + + return $tasks; + } + /** * @inheritDoc */ @@ -182,6 +216,29 @@ class RedisTaskStorage implements TaskStorage if ($this->conn->hSet($this->key_prefix . $taskId, 'data', $taskData) === FALSE) return false; + // Delete all the attributes + $allLoggedAttributes = $this->conn->sMembers($this->attributesSet); + foreach ($allLoggedAttributes as $attribute) + { + if ($this->conn->sIsMember($this->attributesSetPrefix . $attribute, $taskId)) + $this->conn->sRem($this->attributesSetPrefix . $attribute, $taskId); + } + + // And write all the attributes + foreach ($task->getAttributes() as $key => $val) + { + // If the attribute value is not a string it can't be used as a searchable attribute + if (!is_string($val)) + continue; + + // And finally add it to the set + $setKey = $this->attributesSetPrefix . $key . '_' . $val; + $this->conn->sAdd($setKey, $taskId); + + // And add the attribute combo to the attributesSet + $this->conn->sAdd($this->attributesSet, $key . '_' . $val); + } + // Modify the unfinished set if ($this->conn->sIsMember($this->unfinishedSet, $taskId) && ($task->getStatus() === Task::COMPLETED || $task->getStatus() === Task::CANCELLED )) $this->conn->sRem($this->unfinishedSet, $taskId); @@ -212,6 +269,14 @@ class RedisTaskStorage implements TaskStorage if ($this->conn->sIsMember($this->unfinishedSet, $taskId)) $this->conn->sRem($this->unfinishedSet, $taskId); + // Delete all the attributes + $allLoggedAttributes = $this->conn->sMembers($this->attributesSet); + foreach ($allLoggedAttributes as $attribute) + { + if ($this->conn->sIsMember($this->attributesSetPrefix . $attribute, $taskId)) + $this->conn->sRem($this->attributesSetPrefix . $attribute, $taskId); + } + // Delete the task itself if ($this->conn->del($this->key_prefix . $taskId) > 0) return true;