diff --git a/.drone.yml b/.drone.yml new file mode 100644 index 0000000..1e980ae --- /dev/null +++ b/.drone.yml @@ -0,0 +1,22 @@ +kind: pipeline +type: docker +name: test + +services: + - name: cache + image: redis + +steps: + - name: composer + image: composer:latest + commands: + - composer install + + - name: redistest + image: phpunit:7.3 + commands: + - vendor/bin/phpunit -c test/phpunit.xml --coverage-php test/temp/covredis.cov + environment: + SUPERVISOR_TYPE: ParallelSuperVisor + TASKSTORAGE_TYPE: RedisTaskStorage + TASKSTORAGE_REDIS_HOST: cache \ No newline at end of file diff --git a/.gitignore b/.gitignore index 952ad32..1d58abf 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ composer.lock .idea/ log/ vendor/ +build/ +test/temp/ diff --git a/Dockerfile b/Dockerfile index f42a7bf..8aabc53 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,11 +1,18 @@ -FROM php:7.3-cli-buster +FROM php:7.3-alpine -RUN apt-get update &&\ - apt-get install --no-install-recommends --assume-yes --quiet procps ca-certificates curl git &&\ - rm -rf /var/lib/apt/lists/* +# FOR ALPINE +# Install git and bash and procps +RUN apk add git bash procps +RUN apk add --no-cache --update --virtual .phpize-deps $PHPIZE_DEPS -# Install Redis -RUN pecl install redis-5.1.1 && docker-php-ext-enable redis +# FOR DEBIAN/UBUNTU +#RUN apt-get update &&\ +# apt-get install --no-install-recommends --assume-yes --quiet procps ca-certificates curl git unzip &&\ +# rm -rf /var/lib/apt/lists/* + +# Install Redis and XDebug +RUN pecl install redis && docker-php-ext-enable redis +RUN pecl install xdebug && docker-php-ext-enable xdebug # Install Composer RUN curl -sS https://getcomposer.org/installer | php -- --install-dir=/usr/local/bin --filename=composer \ No newline at end of file diff --git a/bin/supervisor b/bin/supervisor index 4ffa970..0542413 100644 --- a/bin/supervisor +++ b/bin/supervisor @@ -43,11 +43,11 @@ use FuzeWorks\Exception\LibraryException; use FuzeWorks\Factory; // First perform a PHP version check -if (version_compare('7.1.0', PHP_VERSION, '>')) { +if (version_compare('7.3.0', PHP_VERSION, '>')) { fwrite( STDERR, sprintf( - 'FuzeWorks Async requires PHP 7.1 or higher.' . PHP_EOL . + 'FuzeWorks Async requires PHP 7.3 or higher.' . PHP_EOL . 'You are using PHP %s (%s).' . PHP_EOL, PHP_VERSION, PHP_BINARY @@ -109,7 +109,7 @@ try { // And finally, run the supervisor try { $supervisor = $lib->getSuperVisor($bootstrap); - while ($supervisor->cycle() === SuperVisor::RUNNING) { + while ($supervisor->cycle() !== SuperVisor::RUNNING) { usleep(250000); } diff --git a/bin/worker b/bin/worker index ecac264..c7318a3 100644 --- a/bin/worker +++ b/bin/worker @@ -42,11 +42,11 @@ use FuzeWorks\Factory; // First perform a PHP version check -if (version_compare('7.1.0', PHP_VERSION, '>')) { +if (version_compare('7.3.0', PHP_VERSION, '>')) { fwrite( STDERR, sprintf( - 'FuzeWorks Async requires PHP 7.1 or higher.' . PHP_EOL . + 'FuzeWorks Async requires PHP 7.3 or higher.' . PHP_EOL . 'You are using PHP %s (%s).' . PHP_EOL, PHP_VERSION, PHP_BINARY @@ -119,7 +119,7 @@ $post = isset($arguments['p']); // RUN THE APP $worker = $lib->getWorker(); -$worker->run($taskID, $post); +$worker->runTaskById($taskID, $post); fwrite(STDOUT,'Finished task \'' . $taskID . "'"); ?> \ No newline at end of file diff --git a/composer.json b/composer.json index 2e5c849..ad3c475 100644 --- a/composer.json +++ b/composer.json @@ -13,13 +13,20 @@ } ], "require": { - "php": ">=7.2.0", + "php": ">=7.3.0", "fuzeworks/core": "~1.2.0", "ext-json": "*", "ext-redis": "*" }, "require-dev": { - "fuzeworks/tracycomponent": "~1.2.0" + "phpunit/phpunit": "^9", + "phpunit/phpcov": "^7", + "fuzeworks/mvcr": "~1.2.0" + }, + "config": { + "platform": { + "ext-redis": "1" + } }, "autoload": { "psr-4": { diff --git a/src/FuzeWorks/Async/Constraint/DependencyConstraint.php b/src/FuzeWorks/Async/Constraint/DependencyConstraint.php index 426b1ee..ff78586 100644 --- a/src/FuzeWorks/Async/Constraint/DependencyConstraint.php +++ b/src/FuzeWorks/Async/Constraint/DependencyConstraint.php @@ -56,7 +56,7 @@ use FuzeWorks\Logger; class DependencyConstraint implements Constraint { - public $dependencies = []; + protected $dependencies = []; protected $delayTimes = 3; @@ -100,7 +100,6 @@ class DependencyConstraint implements Constraint // Get dependency $dependencyTask = $taskStorage->getTaskById($dependency); - // If the dependency task is completed, ignore it and continue to next dependency if ($dependencyTask->getStatus() === Task::COMPLETED) continue; @@ -146,6 +145,16 @@ class DependencyConstraint implements Constraint return time() + $this->delayTimes; } + /** + * Return a list of dependencies + * + * @return array + */ + public function getDependencies() + { + return $this->dependencies; + } + /** * Load the tasks library, so that dependencies can get scanned later * diff --git a/src/FuzeWorks/Async/Events/TaskModifyEvent.php b/src/FuzeWorks/Async/Events/TaskModifyEvent.php new file mode 100644 index 0000000..49e8983 --- /dev/null +++ b/src/FuzeWorks/Async/Events/TaskModifyEvent.php @@ -0,0 +1,64 @@ +task = $task; + } + + public function getTask(): Task + { + return $this->task; + } + + public function updateTask(Task $task) + { + $this->task = $task; + } + +} \ No newline at end of file diff --git a/src/FuzeWorks/Async/Executor.php b/src/FuzeWorks/Async/Executor.php index 19a3199..de2c336 100644 --- a/src/FuzeWorks/Async/Executor.php +++ b/src/FuzeWorks/Async/Executor.php @@ -36,9 +36,25 @@ namespace FuzeWorks\Async; +/** + * Interface Executor + * + * + * @todo Implement ListRunningTasks + * @package FuzeWorks\Async + */ interface Executor { + /** + * Executor constructor. + * + * Parameters are a unique array which can differ for each Executor. + * + * @param array $parameters + */ + public function __construct(array $parameters); + // Control methods /** * Start executing a task. @@ -57,16 +73,12 @@ interface Executor * Returns the task since it makes modifications. Has to be modified in TaskStorage by SuperVisor. * * @param Task $task - * @return Task + * @param bool $harshly True to KILL a process + * @return Task|null Returns modified Task on success, or null if no PID is found */ - public function stopTask(Task $task): Task; + public function stopTask(Task $task, bool $harshly = false): ?Task; // Task info public function getTaskRunning(Task $task): bool; public function getTaskStats(Task $task): ?array; - public function getTaskExitCode(Task $task): ?int; - - // All tasks info - public function getRunningTasks(): array; - } \ No newline at end of file diff --git a/src/FuzeWorks/Async/Executors/ShellExecutor.php b/src/FuzeWorks/Async/Executors/ShellExecutor.php index 87df219..247adda 100644 --- a/src/FuzeWorks/Async/Executors/ShellExecutor.php +++ b/src/FuzeWorks/Async/Executors/ShellExecutor.php @@ -36,7 +36,6 @@ namespace FuzeWorks\Async\Executors; use FuzeWorks\Async\Executor; -use FuzeWorks\Async\Process; use FuzeWorks\Async\Task; use FuzeWorks\Async\TasksException; @@ -52,51 +51,56 @@ class ShellExecutor implements Executor /** * ShellExecutor constructor. * - * @param string $bootstrapFile * @param array $parameters * @throws TasksException */ - public function __construct(string $bootstrapFile, array $parameters) + public function __construct(array $parameters) { + if (!isset($parameters['workerFile']) || !isset($parameters['bootstrapFile'])) + throw new TasksException("Could not construct ShellExecutor. Parameter failure."); + // Fetch workerFile - $workerFile = $parameters['workerFile']; + $this->worker = $parameters['workerFile']; + if (!file_exists($this->worker)) + throw new TasksException("Could not construct ShellExecutor. ShellWorker script does not exist."); // First determine the PHP binary $this->binary = PHP_BINDIR . DS . 'php'; - $this->bootstrapFile = $bootstrapFile; - - if (!file_exists($workerFile)) - throw new TasksException("Could not construct ShellExecutor. ShellWorker script does not exist."); - - $this->worker = $workerFile; - } - - private function shellExec($format, array $parameters = []) - { - $parameters = array_map("escapeshellarg", $parameters); - array_unshift($parameters, $format); - $command = call_user_func_array("sprintf", $parameters); - exec($command, $output); - return $output; + $this->bootstrapFile = $parameters['bootstrapFile']; + if (!file_exists($this->bootstrapFile)) + throw new TasksException("Could not construct ShellExecutor. No bootstrap file found."); } public function startTask(Task $task, bool $post = false): Task { // First prepare the command used to spawn workers - $commandString = "$this->binary $this->worker --bootstrap=".$this->bootstrapFile." -t %s ".($post ? 'p' : '')." $this->stdout $this->stderr & echo $!"; + $commandString = "$this->binary $this->worker --bootstrap=".$this->bootstrapFile." -t %s ".($post ? '-p' : '')." $this->stdout $this->stderr & echo $!"; // Then execute the command using the base64_encoded string of the taskID $output = $this->shellExec($commandString, [base64_encode($task->getId())]); $pid = intval($output[0]); - $task->setProcess(new Process($pid)); + $task->addAttribute('pid', $pid); // And finally return the task return $task; } - public function stopTask(Task $task): Task + public function stopTask(Task $task, bool $harshly = false): ?Task { - // TODO: Implement stopTask() method. + // First prepare the kill command + $commandString = "kill " . ($harshly ? '-9 ' : '') . "%s"; + + // Fetch the process ID from the task + $pid = $task->attribute('pid'); + if (is_null($pid)) + return null; + + // Then execute the command + $this->shellExec($commandString, [$pid]); + if (!$this->getTaskRunning($task)) + $task->removeAttribute('pid'); + + return $task; } public function getTaskRunning(Task $task): bool @@ -111,13 +115,10 @@ class ShellExecutor implements Executor $commandString = "ps -o pid,%%cpu,%%mem,state,start -p %s | sed 1d"; // First we must determine what process is used. - $process = $task->getProcess(); - if (is_null($process)) + $pid = $task->attribute('pid'); + if (is_null($pid)) return null; - // Then using that process we determine the ProcessID - $pid = $process->getPid(); - // And we execute the commandString to fetch info on the process $output = $this->shellExec($commandString, [$pid]); @@ -141,16 +142,15 @@ class ShellExecutor implements Executor return null; // Finally, return the Task information - return $parts; + return ['pid' => (int) $parts[0], 'cpu' => (float) $parts[1], 'mem' => (float) $parts[2], 'state' => $parts[3], 'start' => $parts[4]]; } - public function getTaskExitCode(Task $task): int + protected function shellExec($format, array $parameters = []) { - // TODO: Implement getTaskExitCode() method. - } - - public function getRunningTasks(): array - { - // TODO: Implement getRunningTasks() method. + $parameters = array_map("escapeshellarg", $parameters); + array_unshift($parameters, $format); + $command = call_user_func_array("sprintf", $parameters); + exec($command, $output); + return $output; } } \ No newline at end of file diff --git a/src/FuzeWorks/Async/Handler.php b/src/FuzeWorks/Async/Handler.php index fe7d4ec..6e3c570 100644 --- a/src/FuzeWorks/Async/Handler.php +++ b/src/FuzeWorks/Async/Handler.php @@ -38,6 +38,36 @@ namespace FuzeWorks\Async; interface Handler { + /** + * Gets invoked upon being added to the Task + * + * @param Task $task + * @return mixed + * @throws TasksException + */ + public function init(Task $task); + + /** + * Retrieve the parent handler that will first handle this task, before this child Handler + * + * @return Handler|null + */ + public function getParentHandler(): ?Handler; + + /** + * Set the parent handler that will fire before this Handler + * + * @param Handler $parentHandler + */ + public function setParentHandler(Handler $parentHandler): void; + + /** + * Import the parent output into the child + * + * @param string $input + */ + public function setParentInput(string $input): void; + /** * The handler method used to handle this task. * This handler will execute the actual task. @@ -52,9 +82,9 @@ interface Handler /** * Any output generated by primaryHandler should be returned here. * - * @return mixed + * @return string */ - public function getOutput(); + public function getOutput(): string; /** * The handler method used after the primaryHandler if so requested @@ -69,8 +99,8 @@ interface Handler /** * Any output generated by postHandler should be returned here * - * @return mixed + * @return string */ - public function getPostOutput(); + public function getPostOutput(): string; } \ No newline at end of file diff --git a/src/FuzeWorks/Async/Handler/ControllerHandler.php b/src/FuzeWorks/Async/Handler/ControllerHandler.php new file mode 100644 index 0000000..15f88d8 --- /dev/null +++ b/src/FuzeWorks/Async/Handler/ControllerHandler.php @@ -0,0 +1,245 @@ +controllerName = $controllerName; + $this->controllerMethod = $controllerMethod; + $this->postMethod = $postMethod; + $this->controllerNamespace = $controllerNamespace; + } + + /** + * @inheritDoc + */ + public function init(Task $task) + { + } + + /** + * @inheritDoc + * @throws TasksException + */ + public function primaryHandler(Task $task): bool + { + // Set the arguments + $args = $task->getArguments(); + array_unshift($args, $task); + + // First we fetch the controller + $controller = $this->getController(); + + // Check if method exists + if (!method_exists($controller, $this->controllerMethod)) + throw new TasksException("Could not handle task. Method '$this->controllerMethod' not found on controller."); + + if (!method_exists($controller, 'getTaskStatus')) + throw new TasksException("Could not handle task. Method 'getTaskStatus()' not found on controller, which is required."); + + if ($this->parentInput !== null && method_exists($controller, 'setInput')) + $controller->setInput($this->parentInput); + + // Call method and collect output + $this->output = call_user_func_array([$controller, $this->controllerMethod], $args); + $success = $controller->getTaskStatus(); + if (!is_bool($success)) + throw new TasksException("Could not determine whether task has succeeded. getTaskStatus() returned non-bool."); + + return $success; + } + + /** + * @inheritDoc + */ + public function getOutput(): string + { + return $this->output; + } + + /** + * @inheritDoc + * @throws TasksException + */ + public function postHandler(Task $task) + { + // Abort if no postMethod exists + if (is_null($this->postMethod)) + throw new TasksException("Could not handle task. No post method provided."); + + // First we fetch the controller + $controller = $this->getController(); + + // Check if method exists + if (!method_exists($controller, $this->postMethod)) + throw new TasksException("Could not handle task. Post method '$this->postMethod' not found on controller."); + + if (!method_exists($controller, 'getTaskStatus')) + throw new TasksException("Could not handle task. Method 'getTaskStatus()' not found on controller, which is required."); + + if ($this->parentInput !== null && method_exists($controller, 'setInput')) + $controller->setInput($this->parentInput); + + // Call method and collect output + $this->postOutput = call_user_func_array([$controller, $this->postMethod], [$task]); + $success = $controller->getTaskStatus(); + if (!is_bool($success)) + throw new TasksException("Could not determine whether task has succeeded. getTaskStatus() returned non-bool."); + + return $success; + } + + /** + * @inheritDoc + */ + public function getPostOutput(): string + { + return $this->postOutput; + } + + /** + * @return Controller + * @throws TasksException + */ + private function getController(): Controller + { + // First load the controllers component + try { + /** @var Controllers $controllers */ + $controllers = Factory::getInstance('controllers'); + + // Load the requested controller + return $controllers->get($this->controllerName, [], $this->controllerNamespace); + } catch (FactoryException $e) { + throw new TasksException("Could not get controller. FuzeWorks\MVCR is not installed!"); + } catch (ControllerException $e) { + throw new TasksException("Could not get controller. Controller threw exception: '" . $e->getMessage() . "'"); + } catch (NotFoundException $e) { + throw new TasksException("Could not get controller. Controller was not found."); + } + } + + /** + * @var Handler + */ + private $parentHandler; + + /** + * @inheritDoc + */ + public function getParentHandler(): ?Handler + { + return $this->parentHandler; + } + + /** + * @inheritDoc + */ + public function setParentHandler(Handler $parentHandler): void + { + $this->parentHandler = $parentHandler; + } + + /** + * @inheritDoc + */ + public function setParentInput(string $input): void + { + $this->parentInput = $input; + } +} \ No newline at end of file diff --git a/src/FuzeWorks/Async/Handler/DependentTaskHandler.php b/src/FuzeWorks/Async/Handler/DependentTaskHandler.php new file mode 100644 index 0000000..e93e9fa --- /dev/null +++ b/src/FuzeWorks/Async/Handler/DependentTaskHandler.php @@ -0,0 +1,250 @@ +dependencyList = $dependencyList; + $this->delayTimes = $delayTimes; + } + + /** + * @inheritDoc + */ + public function init(Task $task) + { + if (!empty($this->dependencyList)) + $task->addConstraint(new DependencyConstraint($this->dependencyList, $this->delayTimes)); + } + + /** + * @inheritDoc + */ + public function primaryHandler(Task $task): bool + { + // First find all the dependencies + try { + $dependencies = $this->fetchDependencies($task); + $this->output = json_encode($dependencies); + return true; + } catch (TasksException $e) { + $this->output = 'Failed to fetch dependencies. TasksException: ' . $e->getMessage(); + return false; + } + } + + /** + * @inheritDoc + */ + public function getOutput(): string + { + return $this->output; + } + + /** + * @inheritDoc + */ + public function postHandler(Task $task) + { + // First find all the dependencies + try { + $dependencies = $this->fetchDependencies($task); + $this->output = json_encode($dependencies); + return true; + } catch (TasksException $e) { + $this->output = 'Failed to fetch dependencies. TasksException: ' . $e->getMessage(); + return false; + } + } + + /** + * @inheritDoc + */ + public function getPostOutput(): string + { + return $this->output; + } + + /** + * @inheritDoc + */ + public function getParentHandler(): ?Handler + { + return $this->parentHandler; + } + + /** + * @inheritDoc + */ + public function setParentHandler(Handler $parentHandler): void + { + $this->parentHandler = $parentHandler; + } + + /** + * @inheritDoc + */ + public function setParentInput(string $input): void + { + // Parent output gets set at this handler's output. + // Only if this class has something to intervene it will override the parent output + // Which should be always... but alas. + $this->output = $input; + } + + /** + * @param Task $task + * @return array + * @throws TasksException + */ + protected function fetchDependencies(Task $task): array + { + // When it receives the task, all dependencies should already be handled + // the primary handler will therefore connect with the DependencyConstraint and fetch dependencies + $constraints = $task->getConstraints(); + + // First prepare a list of dependencies + $dependencies = []; + $dependencyConstraints = []; + foreach ($constraints as $constraint) { + if ($constraint instanceof Constraint) + $dependencyConstraints[] = $constraint; + } + + // If no dependencies found, throw exception + if (empty($dependencyConstraints)) + return $dependencies; + + // Afterwards build a list of dependencies + /** @var DependencyConstraint $constraint */ + foreach ($dependencyConstraints as $constraint) { + foreach ($constraint->getDependencies() as $dependency) { + if (!isset($dependencies[$dependency])) + $dependencies[$dependency] = []; + } + } + + // Now that all dependencies are determined, fetch all the output + $tasks = $this->loadTasksLib(); + $taskStorage = $tasks->getTaskStorage(); + foreach ($dependencies as $dependency => $data) + { + // Fetch the task + try { + $dependencyTask = $taskStorage->getTaskById($dependency); + + // Then fetch all output + $dependencies[$dependency]['status'] = $dependencyTask->getStatus(); + $dependencies[$dependency]['output'] = $dependencyTask->getOutput(); + $dependencies[$dependency]['errors'] = $dependencyTask->getErrors(); + $dependencies[$dependency]['post'] = $dependencyTask->getPostOutput(); + $dependencies[$dependency]['postErrors'] = $dependencyTask->getPostErrors(); + } catch (TasksException $e) { + $dependencies[$dependency]['status'] = Task::FAILED; + $dependencies[$dependency]['output'] = null; + $dependencies[$dependency]['errors'] = 'Task not found.'; + $dependencies[$dependency]['post'] = null; + $dependencies[$dependency]['postErrors'] = null; + } + } + + return $dependencies; + } + + /** + * Load the tasks library, so that dependencies can get scanned later + * + * @return Tasks + * @throws TasksException + */ + private function loadTasksLib(): Tasks + { + try { + /** @var Libraries $libraries */ + $libraries = Factory::getInstance('libraries'); + + /** @var Tasks $tasks */ + $tasks = $libraries->get('async'); + + return $tasks; + } catch (FactoryException | LibraryException $e) { + throw new TasksException("Could not constrain task. Async library could not be loaded."); + } + } +} \ No newline at end of file diff --git a/src/FuzeWorks/Async/ShellWorker.php b/src/FuzeWorks/Async/ShellWorker.php index a614541..6d8b452 100644 --- a/src/FuzeWorks/Async/ShellWorker.php +++ b/src/FuzeWorks/Async/ShellWorker.php @@ -67,88 +67,84 @@ class ShellWorker } /** - * @param string $taskID + * Run a task by finding its ID + * + * @param string $taskId * @param bool $post * @throws EventException * @throws TasksException */ - public function run(string $taskID, bool $post = false) + public function runTaskById(string $taskId, bool $post = false) { // First fetch the task try { - $task = $this->taskStorage->getTaskById($taskID); + $task = $this->taskStorage->getTaskById($taskId); } catch (TasksException $e) { throw new TasksException("Could not run worker. Task not found."); } + $this->run($task, $post); + } + + /** + * @param Task $task + * @param bool $post + * @throws EventException + * @throws TasksException + */ + public function run(Task $task, bool $post = false) + { // Fire a taskHandleEvent /** @var TaskHandleEvent $event */ $event = Events::fireEvent(new TaskHandleEvent(), $task); - $task = $event->getTask(); // Set task to this worker - $this->task = $task; + $this->task = $event->getTask(); $this->post = $post; // Fetch the callable - $class = $this->task->getHandlerClass(); - if (!class_exists($class, true)) - { - $errors = 'Could not run task. HandlerClass \'' . $class . '\' not found.'; - if (!$post) - $this->taskStorage->writeTaskOutput($this->task, '', $errors, Task::PFAILED, $this->task->getRetries()); - else - $this->taskStorage->writePostOutput($this->task, '', $errors, Task::PFAILED, $this->task->getRetries()); + $handler = $this->task->getHandler(); - throw new TasksException("Could not run task. '$class' not found."); - } + // Execute the handler and all its parent handlers + $success = $this->executeHandler($this->task, $handler, $post); - // Create the handler - /** @var Handler $object */ - $object = new $class(); - if (!$object instanceof Handler) - { - $errors = "Could not run task. '$class' is not instance of Handler."; - if (!$post) - $this->taskStorage->writeTaskOutput($this->task, '', $errors, Task::PFAILED, $this->task->getRetries()); - else - $this->taskStorage->writePostOutput($this->task, '', $errors, Task::PFAILED, $this->task->getRetries()); - - throw new TasksException("Could not run task. '$class' is not instance of Handler."); - } - - // Run postHandler if post mode is requested - if ($post) - { - $postSuccess = $object->postHandler($this->task); - $postOutput = $object->getPostOutput(); - $postOutput = is_null($postOutput) ? '' : (string) $postOutput; - $postErrors = $this->getErrors(); - - if (!$postSuccess) - $this->taskStorage->writePostOutput($this->task, $postOutput, $postErrors, Task::FAILED, $this->task->getRetries()); - else - $this->taskStorage->writePostOutput($this->task, $postOutput, $postErrors, Task::SUCCESS, $this->task->getRetries()); - - $this->output($postOutput, $postErrors); - return; - } - - // Run primaryHandler if requested - $success = $object->primaryHandler($this->task); - $output = $object->getOutput(); - $output = is_null($output) ? '' : (string) $output; + // Fetch the output and errors + $output = $post ? $handler->getPostOutput() : $handler->getOutput(); + $output = is_null($output) ? '' : $output; $errors = $this->getErrors(); - // And afterwards write the results to the TaskStorage - if (!$success) - $this->taskStorage->writeTaskOutput($this->task, $output, $errors, Task::FAILED, $this->task->getRetries()); + // If the task failed, write so to task storage, based on whether this is a post request or not + if (!$success && $post) + $this->taskStorage->writePostOutput($this->task, $output, $errors, Task::FAILED); + elseif (!$success && !$post) + $this->taskStorage->writeTaskOutput($this->task, $output, $errors, Task::FAILED); + elseif ($success && $post) + $this->taskStorage->writePostOutput($this->task, $output, $errors, Task::SUCCESS); else - $this->taskStorage->writeTaskOutput($this->task, $output, $errors, Task::SUCCESS, $this->task->getRetries()); + $this->taskStorage->writeTaskOutput($this->task, $output, $errors, Task::SUCCESS); - $this->output($output, $errors); + // And write the final output + $this->output((string) $output, $errors); } + protected function executeHandler(Task $task, Handler $handler, bool $usePost = false): bool + { + // First check to see if there is a parent handler + $parent = $handler->getParentHandler(); + if (!is_null($parent)) { + // Execute the parent + if ($this->executeHandler($task, $parent, $usePost) === false) + return false; + + // Fetch the output of the parent + $output = $usePost ? $parent->getPostOutput() : $parent->getOutput(); + + // And insert it as input into the child handler + $handler->setParentInput($output); + } + + return $usePost ? $handler->postHandler($task) : $handler->primaryHandler($task); + } /** * In case a fatal error or exception occurs, the errors shall be redirected to stderr * @@ -170,9 +166,9 @@ class ShellWorker try { // Write to TaskStorage if (!$this->post) - $this->taskStorage->writeTaskOutput($this->task, '', $errors, Task::FAILED, $this->task->getRetries()); + $this->taskStorage->writeTaskOutput($this->task, '', $errors, Task::FAILED); else - $this->taskStorage->writePostOutput($this->task, '', $errors, Task::FAILED, $this->task->getRetries()); + $this->taskStorage->writePostOutput($this->task, '', $errors, Task::FAILED); } catch (TasksException $e) { // Ignore } diff --git a/src/FuzeWorks/Async/Supervisors/ParallelSuperVisor.php b/src/FuzeWorks/Async/Supervisors/ParallelSuperVisor.php index 0408ec8..ee6277a 100644 --- a/src/FuzeWorks/Async/Supervisors/ParallelSuperVisor.php +++ b/src/FuzeWorks/Async/Supervisors/ParallelSuperVisor.php @@ -70,8 +70,7 @@ class ParallelSuperVisor implements SuperVisor public function cycle(): int { // First: if there are no tasks, load them - $this->taskStorage->refreshTasks(); - $this->tasks = $this->taskStorage->readTasks(); + $this->tasks = $this->taskStorage->readTasks(true); // If there are still no tasks, nothing is queued, so this cycle can end. if (empty($this->tasks)) @@ -94,6 +93,7 @@ class ParallelSuperVisor implements SuperVisor // Start the process using the executor service $task = $this->executor->startTask($task); $task->setStatus(Task::RUNNING); + $task->startTaskTime(); // Modify the task in TaskStorage $this->taskStorage->modifyTask($task); @@ -108,16 +108,11 @@ class ParallelSuperVisor implements SuperVisor fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus())); } - // CANCELLED/COMPLETED: remove the task if requested to do so - elseif ($task->getStatus() === Task::COMPLETED || $task->getStatus() === Task::CANCELLED) - { - } - // RUNNING: check if task is still running. If not, set result based on output elseif ($task->getStatus() === Task::RUNNING) { $isRunning = $this->executor->getTaskRunning($task); - $output = $this->taskStorage->readTaskOutput($task, $task->getRetries()); + $output = $this->taskStorage->readTaskOutput($task); $hasOutput = !is_null($output); // If nothing is found, the process has crashed and status PFAILED should be set @@ -140,6 +135,7 @@ class ParallelSuperVisor implements SuperVisor continue; // If any changes have been made, they should be written to TaskStorage + $task->endTaskTime(); $this->taskStorage->modifyTask($task); fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus())); } @@ -148,7 +144,7 @@ class ParallelSuperVisor implements SuperVisor elseif ($task->getStatus() === Task::PFAILED || $task->getStatus() === Task::FAILED) { // First fetch retry conditions - $settings = $task->getRetrySettings(); + $settings = $task->getSettings(); // First test if any retries should be tried at all if ($settings['retryOnFail'] === true && $task->getRetries() < $settings['maxRetries']) @@ -163,6 +159,7 @@ class ParallelSuperVisor implements SuperVisor $task->addRetry(); $task = $this->executor->startTask($task); $task->setStatus(Task::RUNNING); + $task->startTaskTime(); $this->taskStorage->modifyTask($task); fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus())); continue; @@ -174,6 +171,7 @@ class ParallelSuperVisor implements SuperVisor { $task->resetRetries(); $task = $this->executor->startTask($task, true); + $task->startPostTime(); $task->setStatus(Task::POST); } else @@ -190,6 +188,7 @@ class ParallelSuperVisor implements SuperVisor { $task->resetRetries(); $task = $this->executor->startTask($task, true); + $task->startPostTime(); $task->setStatus(Task::POST); } else @@ -203,20 +202,22 @@ class ParallelSuperVisor implements SuperVisor elseif ($task->getStatus() === Task::POST) { $isRunning = $this->executor->getTaskRunning($task); - $output = $this->taskStorage->readPostOutput($task, $task->getRetries()); + $output = $this->taskStorage->readPostOutput($task); $hasOutput = !is_null($output); // If a task is not running and has no output, an error has occurred if (!$isRunning && !$hasOutput) { // Test if a retry should be attempted - $settings = $task->getRetrySettings(); + $settings = $task->getSettings(); if ($settings['retryOnFail'] === true && $settings['retryPostFailures'] === true && $settings['maxRetries'] > $task->getRetries()) { $task->addRetry(); $task = $this->executor->startTask($task, true); } - elseif ($settings['maxRetries'] <= $task->getRetries()) + elseif ($settings['retryOnFail'] === true && $settings['retryPostFailures'] === true && $settings['maxRetries'] <= $task->getRetries()) + $task->setStatus(Task::CANCELLED); + else $task->setStatus(Task::CANCELLED); } // @todo Retry after $max_Time @@ -224,13 +225,17 @@ class ParallelSuperVisor implements SuperVisor elseif (!$isRunning && $hasOutput) { $task->setPostOutput($output['output'], $output['errors']); - $task->setStatus(Task::COMPLETED); + if ($output['statusCode'] === Task::SUCCESS) + $task->setStatus(Task::COMPLETED); + else + $task->setStatus(Task::CANCELLED); } // If the task is still running, leave it be else continue; // If any changes have been made, they should be written to TaskStorage + $task->endPostTime(); $this->taskStorage->modifyTask($task); fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus())); } diff --git a/src/FuzeWorks/Async/Task.php b/src/FuzeWorks/Async/Task.php index 3367929..2da3945 100644 --- a/src/FuzeWorks/Async/Task.php +++ b/src/FuzeWorks/Async/Task.php @@ -116,14 +116,9 @@ class Task protected $taskId; /** - * @var string + * @var int */ - protected $handlerClass; - - /** - * @var bool - */ - protected $usePostHandler = false; + protected $status = Task::PENDING; /** * @var array @@ -131,9 +126,14 @@ class Task protected $arguments; /** - * @var int + * @var Handler */ - protected $status = Task::PENDING; + protected $handler; + + /** + * @var bool + */ + protected $usePostHandler = false; /** * @var Constraint[] @@ -170,11 +170,6 @@ class Task */ protected $attributes = []; - /** - * @var Process - */ - protected $process; - /* -------- Some settings ------------ */ protected $retryOnFail = false; @@ -183,22 +178,27 @@ class Task protected $retryRFailures = true; protected $retryPostFailures = true; protected $retries = 0; + protected $maxTime = 30; /** * Task constructor. * * Creates a Task object, which can be added to the TaskQueue. * - * @param string $identifier The unique identifier of this task. Make sure it is always unique! - * @param string $handlerClass The class that shall handle this task - * @param bool $usePostHandler Whether the postHandler on handlerClass should also be used + * @param string $identifier The unique identifier of this task. Make sure it is always unique! + * @param Handler $handler The Handler object which will run the Task in the Worker + * @param bool $usePostHandler Whether the postHandler on Handler should also be used * @param mixed $parameters,... The arguments provided to the method that shall handle this class * @throws TasksException */ - public function __construct(string $identifier, string $handlerClass, bool $usePostHandler = false) + public function __construct(string $identifier, Handler $handler, bool $usePostHandler = false) { + // Check if the provided Handler is serializable + if (!$this->isSerializable($handler)) + throw new TasksException("Could not create Task. Provided Handler is not serializable."); + $this->taskId = $identifier; - $this->handlerClass = $handlerClass; + $this->handler = $handler; $this->usePostHandler = $usePostHandler; if (func_num_args() > 3) $args = array_slice(func_get_args(), 3); @@ -210,6 +210,9 @@ class Task throw new TasksException("Could not create Task. Provided arguments are not serializable."); $this->arguments = $args; + + // Init the handler + $this->handler->init($this); } /** @@ -223,17 +226,17 @@ class Task } /** - * Gets the name of the class that shall process this task + * Gets the Handler that shall process this task * - * @return string + * @return Handler */ - public function getHandlerClass(): string + public function getHandler(): Handler { - return $this->handlerClass; + return $this->handler; } /** - * Whether the postHandler on the handlerClass should be invoked after processing the initial task. + * Whether the postHandler on the Handler should be invoked after processing the initial task. * * @return bool */ @@ -314,6 +317,8 @@ class Task return $this->delayTime; } + /* ---------------------------------- Attributes setters and getters ------------------ */ + /** * Fetch an attribute of this task * @@ -343,6 +348,22 @@ class Task $this->attributes[$key] = $value; } + /** + * Remove an attribute from a Task + * + * @param string $key + * @throws TasksException + */ + public function removeAttribute(string $key) + { + if (!isset($this->attributes[$key])) + throw new TasksException("Could not remove Task '$this->taskId' attribute '$key'. Not found."); + + unset($this->attributes[$key]); + } + + /* ---------------------------------- Output setters and getters ---------------------- */ + /** * Return the output of this task execution * @@ -373,11 +394,10 @@ class Task return $this->postErrors; } - /** - * @todo Handle output from multiple attempts * @param string $output * @param string $errors + * @todo Handle output from multiple attempts */ public function setOutput(string $output, string $errors) { @@ -386,9 +406,9 @@ class Task } /** - * @todo Handle output from multiple attempts * @param string $output * @param string $errors + * @todo Handle output from multiple attempts */ public function setPostOutput(string $output, string $errors) { @@ -396,55 +416,26 @@ class Task $this->postErrors = $errors; } - /** - * Sets the initial process for this task. - * - * To be set by Executor - * - * @param Process $process - */ - public function setProcess(Process $process) - { - $this->process = $process; - } - - /** - * Returns the initial process for this task - * - * @return Process|null - */ - public function getProcess(): ?Process - { - return $this->process; - } - - public function removeProcess(): bool - { - if ($this->process instanceof Process) - { - $this->process = null; - return true; - } - - return false; - } + /* ---------------------------------- Failure settings and criteria ------------------- */ /** * Set whether this task should retry after a failure, and how many times * - * @param bool $retryOnFail - * @param int $maxRetries - * @param bool $retryRegularFailures - * @param bool $retryProcessFailures - * @param bool $retryPostFailures + * @param bool $retryOnFail Whether this task should be retried if failing + * @param int $maxRetries How many times the task should be retried + * @param int $maxTime How long a task may run before it shall be forcefully shut down + * @param bool $retryRegularFailures Whether regular Task::FAILED should be retried + * @param bool $retryProcessFailures Whether process based Task::PFAILED should be retried + * @param bool $retryPostFailures Whether failures during the Task::POST phase should be retried. */ - public function setRetrySettings(bool $retryOnFail, int $maxRetries = 2, bool $retryRegularFailures = true, bool $retryProcessFailures = true, bool $retryPostFailures = true) + public function setSettings(bool $retryOnFail, int $maxRetries = 2, int $maxTime = 30, bool $retryRegularFailures = true, bool $retryProcessFailures = true, bool $retryPostFailures = true) { $this->retryOnFail = $retryOnFail; $this->maxRetries = $maxRetries; $this->retryPFailures = $retryProcessFailures; $this->retryRFailures = $retryRegularFailures; $this->retryPostFailures = $retryPostFailures; + $this->maxTime = $maxTime; } /** @@ -452,17 +443,20 @@ class Task * * @return array */ - public function getRetrySettings(): array + public function getSettings(): array { return [ 'retryOnFail' => $this->retryOnFail, 'maxRetries' => $this->maxRetries, 'retryPFailures' => $this->retryPFailures, 'retryRFailures' => $this->retryRFailures, - 'retryPostFailures' => $this->retryPostFailures + 'retryPostFailures' => $this->retryPostFailures, + 'maxTime' => $this->maxTime ]; } + /* ---------------------------------- Retries and attempts registers ------------------ */ + /** * Add a retry to the retry counter */ @@ -489,13 +483,66 @@ class Task return $this->retries; } + /* ---------------------------------- Runtime data getters and setters----------------- */ + + protected $taskStartTime; + protected $taskEndTime; + protected $postStartTime; + protected $postEndTime; + + public function startTaskTime() + { + $this->taskEndTime = null; + $this->taskStartTime = time(); + } + + public function endTaskTime() + { + $this->taskEndTime = time(); + } + + public function getTaskTime(): ?int + { + if (is_null($this->taskStartTime)) + return null; + + if (is_null($this->taskEndTime)) + return time() - $this->taskStartTime; + + return $this->taskEndTime - $this->taskStartTime; + } + + public function startPostTime() + { + $this->postEndTime = null; + $this->postStartTime = time(); + } + + public function endPostTime() + { + $this->postEndTime = time(); + } + + public function getPostTime(): ?int + { + if (is_null($this->postStartTime)) + return null; + + if (is_null($this->postEndTime)) + return time() - $this->postStartTime; + + return $this->postEndTime - $this->postStartTime; + } + /** * Checks whether an object can be serialized * * @param $value * @return bool + * @todo Improve so it is properly tested */ - private function isSerializable ($value) { + private function isSerializable($value) + { $return = true; $arr = array($value); diff --git a/src/FuzeWorks/Async/TaskStorage.php b/src/FuzeWorks/Async/TaskStorage.php index 6548240..f0ffafe 100644 --- a/src/FuzeWorks/Async/TaskStorage.php +++ b/src/FuzeWorks/Async/TaskStorage.php @@ -58,14 +58,12 @@ interface TaskStorage /** * Retrieves a list of all tasks logged in the system * + * When using $noIncludeDone, Tasks that are Completed and Cancelled will not be returned. + * + * @param bool $noIncludeDone * @return Task[] */ - public function readTasks(): array; - - /** - * Reload the list of all tasks. - */ - public function refreshTasks(); + public function readTasks(bool $noIncludeDone = false): array; /** * Retrieves an individual task by its identifier @@ -79,6 +77,11 @@ interface TaskStorage /** * Modifies a task * + * Should first check if the Task exists in TaskStorage. If not, it should throw a TasksException. + * + * Modifies a Task in TaskStorage. Should fire a TaskModifyEvent and cancel the edit if cancelled by the event. + * If cancelled, should return zero. Preferably be logged as well. + * * @param Task $task * @return bool * @throws TasksException @@ -102,11 +105,10 @@ interface TaskStorage * @param string $output * @param string $errors * @param int $statusCode - * @param int $attempt * @return bool * @throws TasksException */ - public function writeTaskOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool; + public function writeTaskOutput(Task $task, string $output, string $errors, int $statusCode): bool; /** * Write the output of the postHandler into TaskStorage @@ -117,11 +119,10 @@ interface TaskStorage * @param string $output * @param string $errors * @param int $statusCode - * @param int $attempt * @return bool * @throws TasksException */ - public function writePostOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool; + public function writePostOutput(Task $task, string $output, string $errors, int $statusCode): bool; /** * Read the task output from taskStorage. @@ -130,11 +131,15 @@ interface TaskStorage * array('output' => string $output, 'errors' => string $errors, 'status' => $code) * OR null if not found (yet) * - * $attempt refers to $task->getRetries(). If 0, it is the initial attempt. If > 0, it seeks a retry output. + * Attempt refers to the attempt that the task has ran and that individual output. + * If > 0, the output as mentioned above will be returned. If = 0, an array of such outputs will be returned. + * + * Returns null because that is a very valid response. Oftentimes output will need to be checked and its undesirable + * to always throw an exception for expected behaviour. * * @param Task $task * @param int $attempt - * @return array + * @return array|null */ public function readTaskOutput(Task $task, int $attempt = 0): ?array; @@ -145,11 +150,24 @@ interface TaskStorage * array('output' => string $output, 'errors' => string $errors, 'status' => $code) * OR null if not found (yet) * - * $attempt refers to $task->getRetries(). If 0, it is the initial attempt. If > 0, it seeks a retry output. + * Attempt refers to the attempt that the task has ran and that individual output. + * If > 0, the output as mentioned above will be returned. If = 0, an array of such outputs will be returned. + * + * Returns null because that is a very valid response. Oftentimes output will need to be checked and its undesirable + * to always throw an exception for expected behaviour. * * @param Task $task * @param int $attempt * @return array|null */ public function readPostOutput(Task $task, int $attempt = 0): ?array; + + /** + * Reset the TaskStorage. + * + * Remove all tasks and their output from the storage so the TaskStorage begins anew. + * + * @return bool + */ + public function reset(): bool; } \ No newline at end of file diff --git a/src/FuzeWorks/Async/TaskStorage/ArrayTaskStorage.php b/src/FuzeWorks/Async/TaskStorage/ArrayTaskStorage.php index bbc0269..6946379 100644 --- a/src/FuzeWorks/Async/TaskStorage/ArrayTaskStorage.php +++ b/src/FuzeWorks/Async/TaskStorage/ArrayTaskStorage.php @@ -168,10 +168,20 @@ class ArrayTaskStorage implements TaskStorage unset($this->tasks[$i]); $this->commit(); - // Afterwards remove the task output, if it exists - $file = dirname($this->fileName) . DS . 'task_' . md5($taskId) . '_output.json'; - if (file_exists($file)) - unlink($file); + // Remove all task output and post output + $settings = $task->getSettings(); + $maxRetries = $settings['maxRetries']; + for ($j=0;$j<=$maxRetries;$j++) + { + // First remove all possible task output + $outFile = dirname($this->fileName) . DS . 'task_' . md5($taskId) . '_' . $j . '_output.json'; + if (file_exists($outFile)) + unlink($outFile); + + $postFile = dirname($this->fileName) . DS . 'task_' . md5($taskId) . '_' . $j . '_post_output.json'; + if (file_exists($postFile)) + unlink($postFile); + } return true; } @@ -262,6 +272,44 @@ class ArrayTaskStorage implements TaskStorage return ['output' => $data['output'], 'errors' => $data['errors'], 'statusCode' => $data['statusCode']]; } + /** + * @inheritDoc + * @throws TasksException + */ + public function reset(): bool + { + // Delete everything + $this->refreshTasks(); + for ($i=0;$itasks);$i++) + { + // Get the task + $task = $this->tasks[$i]; + $taskId = $task->getId(); + + // Remove all task output and post output + $settings = $task->getSettings(); + $maxRetries = $settings['maxRetries']; + for ($j=0;$j<=$maxRetries;$j++) + { + // First remove all possible task output + $outFile = dirname($this->fileName) . DS . 'task_' . md5($taskId) . '_' . $j . '_output.json'; + if (file_exists($outFile)) + unlink($outFile); + + $postFile = dirname($this->fileName) . DS . 'task_' . md5($taskId) . '_' . $j . '_post_output.json'; + if (file_exists($postFile)) + unlink($postFile); + } + + // Remove the task from the main storage + unset($this->tasks[$i]); + } + + // And finally commit + $this->commit(); + return true; + } + private function commit() { $this->data = ['tasks' => []]; diff --git a/src/FuzeWorks/Async/TaskStorage/DummyTaskStorage.php b/src/FuzeWorks/Async/TaskStorage/DummyTaskStorage.php new file mode 100644 index 0000000..460b860 --- /dev/null +++ b/src/FuzeWorks/Async/TaskStorage/DummyTaskStorage.php @@ -0,0 +1,291 @@ +tasks = array(); + $this->taskOutput = array(); + } + + /** + * @inheritDoc + * @throws TasksException + */ + public function addTask(Task $task): bool + { + // Check if the already exists + $taskId = $task->getId(); + foreach ($this->tasks as $t) + { + if ($t->getId() === $taskId) + throw new TasksException("Could not add Task to TaskStorage. Task '$taskId' already exists."); + } + + $this->tasks[] = clone $task; + return true; + } + + /** + * @inheritDoc + */ + public function readTasks(bool $noIncludeDone = false): array + { + if ($noIncludeDone === false) + return $this->tasks; + + $tasks = []; + foreach ($this->tasks as $task) + { + if ($task->getStatus() !== Task::COMPLETED && $task->getStatus() !== Task::CANCELLED) + $tasks[] = $task; + } + + return $tasks; + } + + /** + * @inheritDoc + */ + public function getTaskById(string $identifier): Task + { + foreach ($this->tasks as $t) + if ($t->getId() === $identifier) + return $t; + + throw new TasksException("Could not get task by id. Task not found."); + } + + /** + * @inheritDoc + */ + public function modifyTask(Task $task): bool + { + // Fire the TaskModifyEvent + /** @var TaskModifyEvent $event */ + $event = Events::fireEvent(new TaskModifyEvent(), $task); + if ($event->isCancelled()) + { + Logger::log("Did not modify task. Cancelled by taskModifyEvent."); + return false; + } + + // And finally replace Task with the event based one. + $task = $event->getTask(); + + $taskId = $task->getId(); + for ($i=0;$itasks);$i++) + { + if ($this->tasks[$i]->getId() === $taskId) + { + $this->tasks[$i] = clone $task; + return true; + } + } + + throw new TasksException("Could not modify task. Task '$taskId' doesn't exist."); + } + + /** + * @inheritDoc + * @throws TasksException + */ + public function deleteTask(Task $task): bool + { + $taskId = $task->getId(); + for ($i=0;$itasks);$i++) + { + if ($this->tasks[$i]->getId() === $taskId) + { + // Remove the task from the main storage + unset($this->tasks[$i]); + return true; + } + } + + throw new TasksException("Could not delete task. Task '$taskId' doesn't exist."); + } + + /** + * @inheritDoc + */ + public function writeTaskOutput(Task $task, string $output, string $errors, int $statusCode): bool + { + // First check if the task exists + $task = $this->getTaskById($task->getId()); + + // Set the attempt number + if (!isset($this->taskOutput[$task->getId()]['outAttempts'])) + { + $this->taskOutput[$task->getId()]['outAttempts'] = 1; + $attempt = $this->taskOutput[$task->getId()]['outAttempts']; + } + else + $attempt = $this->taskOutput[$task->getId()]['outAttempts']++; + + if (isset($this->taskOutput[$task->getId()]['task'][$attempt])) + throw new TasksException("Could not write task output. Output already written."); + + $this->taskOutput[$task->getId()]['task'][$attempt] = [ + 'output' => $output, + 'errors' => $errors, + 'statusCode' => $statusCode + ]; + + return true; + } + + /** + * @inheritDoc + */ + public function writePostOutput(Task $task, string $output, string $errors, int $statusCode): bool + { + // First check if the task exists + $task = $this->getTaskById($task->getId()); + + // Set the attempt number + if (!isset($this->taskOutput[$task->getId()]['postAttempts'])) + { + $this->taskOutput[$task->getId()]['postAttempts'] = 1; + $attempt = $this->taskOutput[$task->getId()]['postAttempts']; + } + else + $attempt = $this->taskOutput[$task->getId()]['postAttempts']++; + + if (isset($this->taskOutput[$task->getId()]['post'][$attempt])) + throw new TasksException("Could not write task post output. Output already written."); + + $this->taskOutput[$task->getId()]['post'][$attempt] = [ + 'output' => $output, + 'errors' => $errors, + 'statusCode' => $statusCode + ]; + + return true; + } + + /** + * @inheritDoc + */ + public function readTaskOutput(Task $task, int $attempt = 0): ?array + { + if (!isset($this->taskOutput[$task->getId()]['task'])) + return null; + + if ($attempt === 0) + $attempt = count($this->taskOutput[$task->getId()]['task']); + + if ($attempt === -1) + return $this->taskOutput[$task->getId()]['task']; + else + { + if (isset($this->taskOutput[$task->getId()]['task'][$attempt])) + return $this->taskOutput[$task->getId()]['task'][$attempt]; + } + + return null; + } + + /** + * @inheritDoc + */ + public function readPostOutput(Task $task, int $attempt = 0): ?array + { + if (!isset($this->taskOutput[$task->getId()]['post'])) + return null; + + if ($attempt === 0) + $attempt = count($this->taskOutput[$task->getId()]['post']); + + if ($attempt === -1) + return $this->taskOutput[$task->getId()]['post']; + else + { + if (isset($this->taskOutput[$task->getId()]['post'][$attempt])) + return $this->taskOutput[$task->getId()]['post'][$attempt]; + } + + return null; + } + + /** + * @inheritDoc + */ + public function reset(): bool + { + $this->tasks = []; + $this->taskOutput = []; + return true; + } +} \ No newline at end of file diff --git a/src/FuzeWorks/Async/TaskStorage/RedisTaskStorage.php b/src/FuzeWorks/Async/TaskStorage/RedisTaskStorage.php index 8f0d000..1f6d3b3 100644 --- a/src/FuzeWorks/Async/TaskStorage/RedisTaskStorage.php +++ b/src/FuzeWorks/Async/TaskStorage/RedisTaskStorage.php @@ -35,9 +35,12 @@ */ namespace FuzeWorks\Async\TaskStorage; +use FuzeWorks\Async\Events\TaskModifyEvent; use FuzeWorks\Async\Task; use FuzeWorks\Async\TasksException; use FuzeWorks\Async\TaskStorage; +use FuzeWorks\Events; +use FuzeWorks\Logger; use Redis; use RedisException; @@ -50,6 +53,7 @@ class RedisTaskStorage implements TaskStorage protected $conn; protected $indexSet = 'async_index'; + protected $unfinishedSet = 'async_index_unfinished'; protected $key_prefix = 'async_task_'; /** @@ -77,6 +81,9 @@ class RedisTaskStorage implements TaskStorage // Otherwise attempt authentication, if needed if (isset($parameters['password']) && !$this->conn->auth($parameters['password'])) throw new TasksException("Could not construct RedisTaskStorage. Authentication failure."); + + // And select the DB index + $this->conn->select($parameters['db_index']); } catch (RedisException $e) { throw new TasksException("Could not construct RedisTaskStorage. RedisException thrown: '" . $e->getMessage() . "'"); } @@ -98,8 +105,15 @@ class RedisTaskStorage implements TaskStorage // Serialize the task and save it $taskData = serialize($task); - $this->conn->set($this->key_prefix . $taskId, $taskData); + + // Register the task $this->conn->sAdd($this->indexSet, $taskId); + if ($task->getStatus() !== Task::COMPLETED && $task->getStatus() !== Task::CANCELLED) + $this->conn->sAdd($this->unfinishedSet, $taskId); + + // And create a hash for it + if ($this->conn->hSet($this->key_prefix . $taskId, 'data', $taskData) === FALSE) + return false; return true; } @@ -107,23 +121,18 @@ class RedisTaskStorage implements TaskStorage /** * @inheritDoc */ - public function readTasks(): array - { - return $this->refreshTasks(); - } - - /** - * @inheritDoc - */ - public function refreshTasks() + public function readTasks(bool $noIncludeDone = false): array { // First fetch an array of all tasks in the set - $taskList = $this->conn->sMembers($this->indexSet); + if ($noIncludeDone) + $taskList = $this->conn->sMembers($this->unfinishedSet); + else + $taskList = $this->conn->sMembers($this->indexSet); // Go over each taskId and fetch the specific task $tasks = []; foreach ($taskList as $taskId) - $tasks[] = unserialize($this->conn->get($this->key_prefix . $taskId)); + $tasks[] = unserialize($this->conn->hGet($this->key_prefix . $taskId, 'data')); return $tasks; } @@ -140,7 +149,7 @@ class RedisTaskStorage implements TaskStorage // Fetch the task /** @var Task $task */ - $task = unserialize($this->conn->get($this->key_prefix . $identifier)); + $task = unserialize($this->conn->hGet($this->key_prefix . $identifier, 'data')); // Return the task return $task; @@ -157,11 +166,29 @@ class RedisTaskStorage implements TaskStorage // Check if it exists $isMember = $this->conn->sIsMember($this->indexSet, $taskId); if (!$isMember) - throw new TasksException("Could not modify task. Task '$taskId' already exists."); + throw new TasksException("Could not modify task. Task '$taskId' does not exists."); + + // Fire the TaskModifyEvent + /** @var TaskModifyEvent $event */ + $event = Events::fireEvent(new TaskModifyEvent(), $task); + if ($event->isCancelled()) + { + Logger::log("Did not modify task. Cancelled by taskModifyEvent."); + return false; + } // And write the data - $taskData = serialize($task); - return $this->conn->set($this->key_prefix . $taskId, $taskData); + $taskData = serialize($event->getTask()); + if ($this->conn->hSet($this->key_prefix . $taskId, 'data', $taskData) === FALSE) + return false; + + // Modify the unfinished set + if ($this->conn->sIsMember($this->unfinishedSet, $taskId) && ($task->getStatus() === Task::COMPLETED || $task->getStatus() === Task::CANCELLED )) + $this->conn->sRem($this->unfinishedSet, $taskId); + elseif (!$this->conn->sIsMember($this->unfinishedSet, $taskId) && $task->getStatus() !== Task::COMPLETED && $task->getStatus() !== Task::CANCELLED) + $this->conn->sAdd($this->unfinishedSet, $taskId); + + return true; } /** @@ -176,24 +203,44 @@ class RedisTaskStorage implements TaskStorage // Check if it exists $isMember = $this->conn->sIsMember($this->indexSet, $taskId); if (!$isMember) - throw new TasksException("Could not modify task. Task '$taskId' already exists."); + throw new TasksException("Could not delete task. Task '$taskId' does not exists."); - // Delete the key - $this->conn->del($this->key_prefix . $taskId); + // Delete the task from the index $this->conn->sRem($this->indexSet, $taskId); - // Remove all task output and post output - $settings = $task->getRetrySettings(); - $maxRetries = $settings['maxRetries']; - for ($i=0;$i<=$maxRetries;$i++) - { - // First remove all possible task output - if ($this->conn->exists($this->key_prefix . $taskId . '_output_' . $i)) - $this->conn->del($this->key_prefix . $taskId . '_output_' . $i); + // And remove the task from the unfinishedSet + if ($this->conn->sIsMember($this->unfinishedSet, $taskId)) + $this->conn->sRem($this->unfinishedSet, $taskId); - if ($this->conn->exists($this->key_prefix . $taskId . '_post_' . $i)) - $this->conn->del($this->key_prefix . $taskId . '_post_' . $i); - } + // Delete the task itself + if ($this->conn->del($this->key_prefix . $taskId) > 0) + return true; + + return false; + } + + /** + * @inheritDoc + */ + public function writeTaskOutput(Task $task, string $output, string $errors, int $statusCode): bool + { + // First get the task ID + $taskId = $task->getId(); + + // Check if the task exists + $isMember = $this->conn->sIsMember($this->indexSet, $taskId); + if (!$isMember) + throw new TasksException("Could not write task output. Task '$taskId' not found."); + + // Prepare the data + $contents = ['taskId' => $taskId, 'output' => $output, 'errors' => $errors, 'statusCode' => $statusCode]; + + // Determine the attempt number + $attempt = $this->conn->hIncrBy($this->key_prefix . $taskId, 'taskOutputAttempts', 1); + + // Then write this output + if ($this->conn->hSet($this->key_prefix . $taskId, 'output' . $attempt, serialize($contents)) === FALSE) + return false; return true; } @@ -201,41 +248,27 @@ class RedisTaskStorage implements TaskStorage /** * @inheritDoc */ - public function writeTaskOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool + public function writePostOutput(Task $task, string $output, string $errors, int $statusCode): bool { // First get the task ID $taskId = $task->getId(); - // Check if the key already exists - if ($this->conn->exists($this->key_prefix . $taskId . '_output_' . $attempt)) - throw new TasksException("Could not write task output. Output already written."); + // Check if the task exists + $isMember = $this->conn->sIsMember($this->indexSet, $taskId); + if (!$isMember) + throw new TasksException("Could not write post output. Task '$taskId' not found."); - // Prepare contents - $contents = ['taskId' => $task->getId(), 'output' => $output, 'errors' => $errors, 'statusCode' => $statusCode]; - $data = serialize($contents); + // Prepare the data + $contents = ['taskId' => $taskId, 'output' => $output, 'errors' => $errors, 'statusCode' => $statusCode]; - // Write contents - return $this->conn->set($this->key_prefix . $taskId . '_output_' . $attempt, $data); - } + // Determine the attempt number + $attempt = $this->conn->hIncrBy($this->key_prefix . $taskId, 'taskPostAttempts', 1); - /** - * @inheritDoc - */ - public function writePostOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool - { - // First get the task ID - $taskId = $task->getId(); + // Then write this output + if ($this->conn->hSet($this->key_prefix . $taskId, 'postOutput' . $attempt, serialize($contents)) === FALSE) + return false; - // Check if the key already exists - if ($this->conn->exists($this->key_prefix . $taskId . '_post_' . $attempt)) - throw new TasksException("Could not write post output. Output already written."); - - // Prepare contents - $contents = ['taskId' => $task->getId(), 'output' => $output, 'errors' => $errors, 'statusCode' => $statusCode]; - $data = serialize($contents); - - // Write contents - return $this->conn->set($this->key_prefix . $taskId . '_post_' . $attempt, $data); + return true; } /** @@ -246,15 +279,47 @@ class RedisTaskStorage implements TaskStorage // First get the task ID $taskId = $task->getId(); - // Check if the key already exists - if (!$this->conn->exists($this->key_prefix . $taskId . '_output_' . $attempt)) - return null; + // If a nothing in particular is requested, fetch the latest and select that as the attempt + if ($attempt === 0) + $attempt = $this->conn->hGet($this->key_prefix . $taskId, 'taskOutputAttempts'); - // Load and convert the data - $data = $this->conn->get($this->key_prefix . $taskId . '_output_' . $attempt); - $data = unserialize($data); + // If -1 is requested, fetch all + if ($attempt === -1) + { + // Get amount of attempts + $totalAttempts = $this->conn->hGet($this->key_prefix . $taskId, 'taskOutputAttempts'); + $output = []; + for ($i=1;$i<=$totalAttempts;$i++) + { + // Check if this output exists + if (!$this->conn->hExists($this->key_prefix . $taskId, 'output' . $i)) + $output[] = null; - return ['output' => $data['output'], 'errors' => $data['errors'], 'statusCode' => $data['statusCode']]; + // Load and convert the data + $data = $this->conn->hGet($this->key_prefix . $taskId, 'output' . $i); + $data = unserialize($data); + + $output[] = ['output' => $data['output'], 'errors' => $data['errors'], 'statusCode' => $data['statusCode']]; + } + + if (!empty($output)) + return $output; + } + // If a specific one is requested, fetch that one + else + { + // Check if this output exists + if (!$this->conn->hExists($this->key_prefix . $taskId, 'output' . $attempt)) + return null; + + // Load and convert the data + $data = $this->conn->hGet($this->key_prefix . $taskId, 'output' . $attempt); + $data = unserialize($data); + + return ['output' => $data['output'], 'errors' => $data['errors'], 'statusCode' => $data['statusCode']]; + } + + return null; } /** @@ -265,14 +330,55 @@ class RedisTaskStorage implements TaskStorage // First get the task ID $taskId = $task->getId(); - // Check if the key already exists - if (!$this->conn->exists($this->key_prefix . $taskId . '_post_' . $attempt)) - return null; + // If a nothing in particular is requested, fetch the latest and select that as the attempt + if ($attempt === 0) + $attempt = $this->conn->hGet($this->key_prefix . $taskId, 'taskPostAttempts'); - // Load and convert the data - $data = $this->conn->get($this->key_prefix . $taskId . '_post_' . $attempt); - $data = unserialize($data); + // If -1 is requested, fetch all + if ($attempt === -1) + { + // Get amount of attempts + $totalAttempts = $this->conn->hGet($this->key_prefix . $taskId, 'taskPostAttempts'); + $output = []; + for ($i=1;$i<=$totalAttempts;$i++) + { + // Check if this output exists + if (!$this->conn->hExists($this->key_prefix . $taskId, 'postOutput' . $i)) + $output[] = null; - return ['output' => $data['output'], 'errors' => $data['errors'], 'statusCode' => $data['statusCode']]; + // Load and convert the data + $data = $this->conn->hGet($this->key_prefix . $taskId, 'postOutput' . $i); + $data = unserialize($data); + + $output[] = ['output' => $data['output'], 'errors' => $data['errors'], 'statusCode' => $data['statusCode']]; + } + + if (!empty($output)) + return $output; + } + // If a specific one is requested, fetch that one + else + { + // Check if this output exists + if (!$this->conn->hExists($this->key_prefix . $taskId, 'postOutput' . $attempt)) + return null; + + // Load and convert the data + $data = $this->conn->hGet($this->key_prefix . $taskId, 'postOutput' . $attempt); + $data = unserialize($data); + + return ['output' => $data['output'], 'errors' => $data['errors'], 'statusCode' => $data['statusCode']]; + } + + return null; + } + + /** + * @inheritDoc + */ + public function reset(): bool + { + // Clear the current db + return $this->conn->flushDB(); } } \ No newline at end of file diff --git a/src/FuzeWorks/Async/Tasks.php b/src/FuzeWorks/Async/Tasks.php index e3ebae6..9875d12 100644 --- a/src/FuzeWorks/Async/Tasks.php +++ b/src/FuzeWorks/Async/Tasks.php @@ -50,6 +50,26 @@ class Tasks implements iLibrary */ private $cfg; + /** + * @var TaskStorage + */ + private $taskStorage; + + /** + * @var Executor + */ + private $executor; + + /** + * @var SuperVisor + */ + private $supervisor; + + /** + * @var ShellWorker + */ + private $shellWorker; + /** * Tasks constructor. * @@ -59,7 +79,6 @@ class Tasks implements iLibrary { /** @var Config $config */ $config = Factory::getInstance('config'); - $config->addComponentPath(dirname(__FILE__, 4), Priority::LOW); $this->cfg = $config->getConfig('tasks'); } @@ -75,23 +94,39 @@ class Tasks implements iLibrary } /** - * @param string $bootstrapFile * @return SuperVisor * @throws TasksException */ - public function getSuperVisor(string $bootstrapFile): SuperVisor + public function getSuperVisor(): SuperVisor { + if (isset($this->supervisor)) + return $this->supervisor; + + // First get the configuration for SuperVisors $cfg = $this->cfg->get('SuperVisor'); - $class = 'FuzeWorks\Async\Supervisors\\' . $cfg['type']; - $parameters = isset($cfg['parameters']) && is_array($cfg['parameters']) ? $cfg['parameters'] : []; - array_unshift($parameters, $this->getTaskStorage(), $this->getExecutor($bootstrapFile)); + + // Select the SuperVisor type + $type = $cfg['type']; + + // Load the class of the currently selected type + $class = 'FuzeWorks\Async\Supervisors\\' . $type; + + // Fetch the parameters for the selected SuperVisor + $parameters = isset($cfg[$type]['parameters']) && is_array($cfg[$type]['parameters']) ? $cfg[$type]['parameters'] : []; + + // Then add the TaskStorage and Executor to the parameters + array_unshift($parameters, $this->getTaskStorage(), $this->getExecutor()); + + // If the type does not exist, throw an exception if (!class_exists($class, true)) throw new TasksException("Could not get SuperVisor. Type of '$class' not found."); + // And load the SuperVisor and test if everything is in order $object = new $class(...$parameters); if (!$object instanceof SuperVisor) throw new TasksException("Could not get SuperVisor. Type of '$class' is not instanceof TaskStorage."); + $this->supervisor = $object; return $object; } @@ -101,7 +136,11 @@ class Tasks implements iLibrary */ public function getWorker(): ShellWorker { - return new ShellWorker($this->getTaskStorage()); + if (isset($this->shellWorker)) + return $this->shellWorker; + + $this->shellWorker = new ShellWorker($this->getTaskStorage()); + return $this->shellWorker; } /** @@ -112,39 +151,68 @@ class Tasks implements iLibrary */ public function getTaskStorage(): TaskStorage { + if (isset($this->taskStorage)) + return $this->taskStorage; + + // First get the configuration for TaskStorage $cfg = $this->cfg->get('TaskStorage'); - $class = 'FuzeWorks\Async\TaskStorage\\' . $cfg['type']; - $parameters = isset($cfg['parameters']) && is_array($cfg['parameters']) ? $cfg['parameters'] : []; + + // Select the TaskStorage type + $type = $cfg['type']; + + // Load the class of the currently selected type + $class = 'FuzeWorks\Async\TaskStorage\\' . $type; + + // Fetch the parameters for the selected type + $parameters = isset($cfg[$type]['parameters']) && is_array($cfg[$type]['parameters']) ? $cfg[$type]['parameters'] : []; + + // If the type does not exist, throw an exception if (!class_exists($class, true)) throw new TasksException("Could not get TaskStorage. Type of '$class' not found."); + // And load the TaskStorage and test if everything is in order $object = new $class($parameters); if (!$object instanceof TaskStorage) throw new TasksException("Could not get TaskStorage. Type '$class' is not instanceof TaskStorage."); - return $object; + $this->taskStorage = $object; + return $this->taskStorage; } /** * Fetch the Executor based on the configured type * - * @param string $bootstrapFile * @return Executor * @throws TasksException */ - protected function getExecutor(string $bootstrapFile): Executor + protected function getExecutor(): Executor { + if (isset($this->executor)) + return $this->executor; + + // First get the configuration for Executor $cfg = $this->cfg->get('Executor'); - $class = 'FuzeWorks\Async\Executors\\' . $cfg['type']; - $parameters = isset($cfg['parameters']) && is_array($cfg['parameters']) ? $cfg['parameters'] : []; + + // Select the Executor type + $type = $cfg['type']; + + // Load the class of the currently selected type + $class = 'FuzeWorks\Async\Executors\\' . $type; + + // Fetch the parameters for the selected type + $parameters = isset($cfg[$type]['parameters']) && is_array($cfg[$type]['parameters']) ? $cfg[$type]['parameters'] : []; + + // If the type does not exist, throw an exception if (!class_exists($class, true)) throw new TasksException("Could not get Executor. Type of '$class' not found."); - $object = new $class($bootstrapFile, $parameters); + // And load the Executor and test if everything is in order + $object = new $class($parameters); if (!$object instanceof Executor) throw new TasksException("Could not get Executor. Type '$class' is not instanceof Executor."); - return $object; + $this->executor = $object; + return $this->executor; } /** diff --git a/test/base/ControllerHandlerTest.php b/test/base/ControllerHandlerTest.php new file mode 100644 index 0000000..f296f42 --- /dev/null +++ b/test/base/ControllerHandlerTest.php @@ -0,0 +1,195 @@ +tasks = Factory::getInstance('libraries')->get('async'); + $this->taskStorage = $this->tasks->getTaskStorage(); + $this->taskStorage->reset(); + + // Reset events + Events::$listeners = []; + } + + /* ---------------------------------- Test the class itself --------------------------- */ + + public function testParametersAndClass() + { + // Create a test class + $handler = new ControllerHandler('TestController', 'testMethod', 'testPostMethod', '\Test\Namespace\\'); + $parentHandler = new EmptyHandler(); + + // Set some parameters + $handler->setParentHandler($parentHandler); + $handler->setParentInput('Some Parent Input'); + + // And test return values + $this->assertInstanceOf(ControllerHandler::class, $handler); + $this->assertSame($parentHandler, $handler->getParentHandler()); + } + + public function testEmptyController() + { + // Create the handler + $handler = new ControllerHandler('empty', 'primary', 'post', '\Mock\Controllers\\'); + $handler->setParentInput('Some input'); + + // And create the dummy Task + $dummyTask = new Task('testEmptyController', $handler, true, 'para1', 'para2'); + + // Write the task to TaskStorage + $this->taskStorage->addTask($dummyTask); + + // Get the SuperVisor and start running the build + $superVisor = $this->tasks->getSuperVisor(); + $this->assertEquals(SuperVisor::RUNNING, $superVisor->cycle()); + $this->assertEquals(Task::RUNNING, + $this->taskStorage->getTaskById($dummyTask->getId())->getStatus() + ); + + // Give the task some time to finish + usleep(750000); + + // Assert that the task is now waiting in POST + $this->assertEquals(SuperVisor::RUNNING, $superVisor->cycle()); + $this->assertEquals(Task::SUCCESS, + $this->taskStorage->getTaskById($dummyTask->getId())->getStatus() + ); + + // Cycle again so it goes into POST mode + $this->assertEquals(SuperVisor::RUNNING, $superVisor->cycle()); + $this->assertEquals(Task::POST, + $this->taskStorage->getTaskById($dummyTask->getId())->getStatus() + ); + + // Give the task some extra time to finish + usleep(750000); + + // Cycle again so it goes into POST mode + $this->assertEquals(SuperVisor::FINISHED, $superVisor->cycle()); + $this->assertEquals(Task::COMPLETED, + $this->taskStorage->getTaskById($dummyTask->getId())->getStatus() + ); + + // Now that the task is finished, let's see if the results match expectations + $dummyTask = $this->taskStorage->getTaskById($dummyTask->getId()); + $this->assertEquals('Primary success: para1 + para2 + Some input', $dummyTask->getOutput()); + $this->assertEquals('Post success: testEmptyController', $dummyTask->getPostOutput()); + } + + /** + * @depends testEmptyController + */ + public function testFailingController() + { + // Create the handler + $handler = new ControllerHandler('failing', 'primary', 'post', '\Mock\Controllers\\'); + + // And create the dummy Task + $dummyTask = new Task('testFailingController', $handler, true, 'para1', 'para2'); + + // Set this task to not retry + $dummyTask->setSettings(false); + + // Write the task to TaskStorage + $this->taskStorage->addTask($dummyTask); + + // Get the SuperVisor and start running the build + $superVisor = $this->tasks->getSuperVisor(); + $this->assertEquals(SuperVisor::RUNNING, $superVisor->cycle()); + $this->assertEquals(Task::RUNNING, + $this->taskStorage->getTaskById($dummyTask->getId())->getStatus() + ); + + // Give the task some time to finish + usleep(750000); + + // Assert that the task is now waiting in POST + $this->assertEquals(SuperVisor::RUNNING, $superVisor->cycle()); + $this->assertEquals(Task::FAILED, + $this->taskStorage->getTaskById($dummyTask->getId())->getStatus() + ); + + // The task should not retry and move to POST now + $this->assertEquals(SuperVisor::RUNNING, $superVisor->cycle()); + $this->assertEquals(Task::POST, + $this->taskStorage->getTaskById($dummyTask->getId())->getStatus() + ); + + // Give the task some extra time to finish + usleep(750000); + + // The task should not retry and move to POST now + $this->assertEquals(SuperVisor::FINISHED, $superVisor->cycle()); + $this->assertEquals(Task::CANCELLED, + $this->taskStorage->getTaskById($dummyTask->getId())->getStatus() + ); + + // Now that the task is finished, let's see if the results match expectations + $dummyTask = $this->taskStorage->getTaskById($dummyTask->getId()); + $this->assertEquals('Primary success: para1 + para2', $dummyTask->getOutput()); + $this->assertEquals('Post success: testFailingController', $dummyTask->getPostOutput()); + $this->assertEquals('ERROR \'Logged some task error!\'', $dummyTask->getErrors()); + $this->assertEquals('ERROR \'Logged some post error!\'', $dummyTask->getPostErrors()); + } + + +} diff --git a/test/base/DependenciesTest.php b/test/base/DependenciesTest.php new file mode 100644 index 0000000..6cd00d7 --- /dev/null +++ b/test/base/DependenciesTest.php @@ -0,0 +1,453 @@ +tasks = Factory::getInstance('libraries')->get('async'); + $this->taskStorage = $this->tasks->getTaskStorage(); + $this->taskStorage->reset(); + + // Reset events + Events::$listeners = []; + } + + /* ---------------------------------- Test the dependency constraint ------------------ */ + + public function testHasConstrainedDeps() + { + // Create the dependent tasks + $depTask1 = new Task('depTask1', new EmptyHandler()); + $depTask2 = new Task('depTask2', new EmptyHandler()); + + // Write those dependencies to TaskStorage + $this->taskStorage->addTask($depTask1); + $this->taskStorage->addTask($depTask2); + + // Create the constraint + $constraint = new DependencyConstraint(['depTask1', 'depTask2']); + + // And a dummyTask to accompany + $dummyTask = new Task('dependentTask', new EmptyHandler()); + $dummyTask->addConstraint($constraint); + + // Test that the constraint is the same + $this->assertSame([$constraint], $dummyTask->getConstraints()); + + // And test the intervention + $this->assertTrue($constraint->intervene($dummyTask)); + $this->assertEquals(Task::DELAYED, $constraint->blockCode()); + $this->assertEquals(time() + 3, $constraint->delayTime()); + } + + /** + * @depends testHasConstrainedDeps + */ + public function testDelayTimes() + { + // Create the dependent tasks + $depTask1 = new Task('depTask1', new EmptyHandler()); + $depTask2 = new Task('depTask2', new EmptyHandler()); + + // Write those dependencies to TaskStorage + $this->taskStorage->addTask($depTask1); + $this->taskStorage->addTask($depTask2); + + // Create some useless dummy task + $dummyTask = new Task('dependentTask', new EmptyHandler()); + + // Create the constraints + // Default time + $constraintDef = new DependencyConstraint(['depTask1', 'depTask2']); + + // Modified time (30) + $constraintMod1 = new DependencyConstraint(['depTask1', 'depTask2'], 30); + + // And another (60) + $constraintMod2 = new DependencyConstraint(['depTask1', 'depTask2'], 60); + + // And intervene all of them + $this->assertTrue($constraintDef->intervene($dummyTask)); + $this->assertTrue($constraintMod1->intervene($dummyTask)); + $this->assertTrue($constraintMod2->intervene($dummyTask)); + + // And check the results + $this->assertEquals(Task::DELAYED, $constraintDef->blockCode()); + $this->assertEquals(Task::DELAYED, $constraintMod1->blockCode()); + $this->assertEquals(Task::DELAYED, $constraintMod2->blockCode()); + $this->assertEquals(time() + 3, $constraintDef->delayTime()); + $this->assertEquals(time() + 30, $constraintMod1->delayTime()); + $this->assertEquals(time() + 60, $constraintMod2->delayTime()); + } + + public function testHasFailedDeps() + { + // Create the dependent tasks + $depTask1 = new Task('depTask1', new EmptyHandler()); + $depTask2 = new Task('depTask2', new EmptyHandler()); + + // And set the first as completed, and second as failed + $depTask1->setStatus(Task::COMPLETED); + $depTask2->setStatus(Task::CANCELLED); + + // Write those dependencies to TaskStorage + $this->taskStorage->addTask($depTask1); + $this->taskStorage->addTask($depTask2); + + // Create the constraint + $constraint = new DependencyConstraint(['depTask1', 'depTask2']); + + // And a dummyTask to accompany + $dummyTask = new Task('dependentTask', new EmptyHandler()); + $dummyTask->addConstraint($constraint); + + // Test that the constraint is the same + $this->assertSame([$constraint], $dummyTask->getConstraints()); + + // And test the intervention + $this->assertTrue($constraint->intervene($dummyTask)); + $this->assertEquals(Task::CANCELLED, $constraint->blockCode()); + $this->assertEquals('Task cancelled due to failed dependency.', $dummyTask->getErrors()); + } + + public function testHasCompletedDeps() + { + // Create the dependent tasks + $depTask1 = new Task('depTask1', new EmptyHandler()); + $depTask2 = new Task('depTask2', new EmptyHandler()); + + // And set the first as completed, and second as failed + $depTask1->setStatus(Task::COMPLETED); + $depTask2->setStatus(Task::COMPLETED); + + // Write those dependencies to TaskStorage + $this->taskStorage->addTask($depTask1); + $this->taskStorage->addTask($depTask2); + + // Create the constraint + $constraint = new DependencyConstraint(['depTask1', 'depTask2']); + + // And a dummyTask to accompany + $dummyTask = new Task('dependentTask', new EmptyHandler()); + $dummyTask->addConstraint($constraint); + + // Test that the constraint is the same + $this->assertSame([$constraint], $dummyTask->getConstraints()); + + // And test the intervention + $this->assertFalse($constraint->intervene($dummyTask)); + } + + public function testGetDependencies() + { + $constraint = new DependencyConstraint(['someTask1', 'someTask2']); + $this->assertEquals(['someTask1', 'someTask2'], $constraint->getDependencies()); + } + + /* ---------------------------------- Test the dependent task handler ----------------- */ + + public function testAddedDependencies() + { + $handler = new DependentTaskHandler(['someTask1', 'someTask2']); + $dummyTask = new Task('someTask', $handler); + + // Check that the constraints match expectations + /** @var DependencyConstraint[] $constraints */ + $constraints = $dummyTask->getConstraints(); + $this->assertInstanceOf(DependencyConstraint::class, $constraints[0]); + + // And that the dependencies match + $this->assertEquals(['someTask1', 'someTask2'], $constraints[0]->getDependencies()); + } + + public function testPassingOutput() + { + // Create the dependent tasks + $depTask1 = new Task('someTask', new EmptyHandler()); + $depTask2 = new Task('someTask2', new EmptyHandler()); + + // Give the dependencies some output + $depTask1->setOutput('First Output', ''); + $depTask2->setOutput('Second Output', ''); + + // Write those to TaskStorage + $this->taskStorage->addTask($depTask1); + $this->taskStorage->addTask($depTask2); + + // Create the task + $handler = new DependentTaskHandler(['someTask', 'someTask2']); + + // Create a dummy Task + $dummyTask = new Task('someTask3', $handler); + + // Assert that all is well + $this->assertTrue($handler->primaryHandler($dummyTask)); + + // And test the handler's output + $this->assertEquals(json_encode([ + 'someTask' => [ + 'status' => Task::PENDING, + 'output' => 'First Output', + 'errors' => '', + 'post' => null, + 'postErrors' => null + ], + 'someTask2' => [ + 'status' => Task::PENDING, + 'output' => 'Second Output', + 'errors' => '', + 'post' => null, + 'postErrors' => null + ] + ]), $handler->getOutput()); + + // And test the post handler + $this->assertTrue($handler->postHandler($dummyTask)); + $this->assertEquals(json_encode([ + 'someTask' => [ + 'status' => Task::PENDING, + 'output' => 'First Output', + 'errors' => '', + 'post' => null, + 'postErrors' => null + ], + 'someTask2' => [ + 'status' => Task::PENDING, + 'output' => 'Second Output', + 'errors' => '', + 'post' => null, + 'postErrors' => null + ] + ]), $handler->getPostOutput()); + } + + /** + * @depends testPassingOutput + */ + public function testMissingDependency() + { + // Create the task + $handler = new DependentTaskHandler(['someTask']); + + // Create a dummy Task + $dummyTask = new Task('someTask2', $handler); + + // Assert that all is well + $this->assertTrue($handler->primaryHandler($dummyTask)); + + // And test the handler's output + $this->assertEquals(json_encode([ + 'someTask' => [ + 'status' => Task::FAILED, + 'output' => null, + 'errors' => 'Task not found.', + 'post' => null, + 'postErrors' => null + ], + ]), $handler->getOutput()); + + // And test the post handler + $this->assertTrue($handler->postHandler($dummyTask)); + $this->assertEquals(json_encode([ + 'someTask' => [ + 'status' => Task::FAILED, + 'output' => null, + 'errors' => 'Task not found.', + 'post' => null, + 'postErrors' => null + ], + ]), $handler->getPostOutput()); + } + + /** + * @depends testPassingOutput + */ + public function testNoDepedencies() + { + // Create the task + $handler = new DependentTaskHandler([]); + + // Create a dummy Task + $dummyTask = new Task('someTask', $handler); + + // Assert that all is well + $this->assertTrue($handler->primaryHandler($dummyTask)); + $this->assertEquals(json_encode([]), $handler->getOutput()); + + // And test the post handler + $this->assertTrue($handler->postHandler($dummyTask)); + $this->assertEquals(json_encode([]), $handler->getPostOutput()); + } + + public function testParentHandler() + { + // Test pass output + $handler = new DependentTaskHandler([]); + $handler->setParentInput('Passed Input'); + $this->assertEquals('Passed Input', $handler->getOutput()); + + // Test passing a handler + $handler = new DependentTaskHandler([]); + $parentHandler = $this->createMock(Handler::class); + $handler->setParentHandler($parentHandler); + $this->assertSame($parentHandler, $handler->getParentHandler()); + } + + public function testPassDependencyOutput() + { + // Build all systems for this test + $superVisor = $this->tasks->getSuperVisor(); + + // Create the dependency + $dependency = new Task('dependency', new ArgumentedHandler(0, 'Prepared Output')); + + // Write the task to TaskStorage + $this->taskStorage->addTask($dependency); + + // Now create the dependent task + $dependent = new Task('dependent', new DependentTaskHandler(['dependency'], 2)); + + // And write that task to TaskStorage + $this->taskStorage->addTask($dependent); + + // Now we make the SuperVisor cycle, to start the dependency and set the dependent to WAIT + $this->assertEquals(SuperVisor::RUNNING, $superVisor->cycle()); + + // Assert that everything is running + $this->assertEquals(Task::RUNNING, + $this->taskStorage->getTaskById($dependency->getId())->getStatus() + ); + $this->assertEquals(Task::DELAYED, + $this->taskStorage->getTaskById($dependent->getId())->getStatus() + ); + + // Give the task some time to finish + usleep(500000); + + // And re-run the SuperVisor + $this->assertEquals(SuperVisor::RUNNING, $superVisor->cycle()); + + // Now check the tasks again. Dependency should be finished and have output, + // whereas dependent should still be delayed + $this->assertEquals(Task::SUCCESS, + $this->taskStorage->getTaskById($dependency->getId())->getStatus() + ); + $this->assertEquals(Task::DELAYED, + $this->taskStorage->getTaskById($dependent->getId())->getStatus() + ); + + // Cycle again and see the dependency be completed, and dependent still delayed + $this->assertEquals(SuperVisor::RUNNING, $superVisor->cycle()); + $this->assertEquals(Task::COMPLETED, + $this->taskStorage->getTaskById($dependency->getId())->getStatus() + ); + $this->assertEquals(Task::DELAYED, + $this->taskStorage->getTaskById($dependent->getId())->getStatus() + ); + + // Also check that output is correct + $this->assertEquals('Prepared Output', + $this->taskStorage->getTaskById($dependency->getId())->getOutput() + ); + + // Now wait long enough for the delay to be finished + usleep(2500000); + + // Now cycle again, and expect the task to be pending + $this->assertEquals(SuperVisor::RUNNING, $superVisor->cycle()); + $this->assertEquals(Task::PENDING, + $this->taskStorage->getTaskById($dependent->getId())->getStatus() + ); + + // Cycle again and expect it to be running + $this->assertEquals(SuperVisor::RUNNING, $superVisor->cycle()); + $this->assertEquals(Task::RUNNING, + $this->taskStorage->getTaskById($dependent->getId())->getStatus() + ); + + // Give the task some time to finish + usleep(500000); + + // And cycle again and expect the task to have succeeded + $this->assertEquals(SuperVisor::RUNNING, $superVisor->cycle()); + $this->assertEquals(Task::SUCCESS, + $this->taskStorage->getTaskById($dependent->getId())->getStatus() + ); + + // Cycle again and expect the task to have completed, and test its output + $this->assertEquals(SuperVisor::FINISHED, $superVisor->cycle()); + $this->assertEquals(Task::COMPLETED, + $this->taskStorage->getTaskById($dependent->getId())->getStatus() + ); + $this->assertEquals( + json_encode(['dependency' => [ + 'status' => Task::COMPLETED, + 'output' => 'Prepared Output', + 'errors' => '', + 'post' => null, + 'postErrors' => null + ]]), + $this->taskStorage->getTaskById($dependent->getId())->getOutput() + ); + } +} diff --git a/test/base/ShellExecutorTest.php b/test/base/ShellExecutorTest.php new file mode 100644 index 0000000..eeb8f89 --- /dev/null +++ b/test/base/ShellExecutorTest.php @@ -0,0 +1,238 @@ +taskStorage = $tasks->getTaskStorage(); + $this->taskStorage->reset(); + + // And load the ShellExecutor using the execution settings + $this->executor = new ShellExecutor([ + 'bootstrapFile' => dirname(__DIR__) . DIRECTORY_SEPARATOR . 'bootstrap.php', + 'workerFile' => dirname(__DIR__, 2) . DIRECTORY_SEPARATOR . 'bin' . DIRECTORY_SEPARATOR . 'worker' + ]); + } + + public function testClass() + { + $this->assertInstanceOf('FuzeWorks\Async\Executors\ShellExecutor', $this->executor); + } + + /** + * @depends testClass + */ + public function testNoWorkerFile() + { + $this->expectException(TasksException::class); + new ShellExecutor(['bootstrapFile' => dirname(__DIR__) . DIRECTORY_SEPARATOR . 'bootstrap.php']); + } + + /** + * @depends testClass + */ + public function testNoBoostrapFile() + { + $this->expectException(TasksException::class); + new ShellExecutor(['workerFile' => dirname(__DIR__, 2) . DIRECTORY_SEPARATOR . 'bin' . DIRECTORY_SEPARATOR . 'worker']); + } + + /** + * @depends testClass + */ + public function testInvalidWorkerFile() + { + $this->expectException(TasksException::class); + new ShellExecutor([ + 'bootstrapFile' => dirname(__DIR__) . DIRECTORY_SEPARATOR . 'bootstrap.php', + 'workerFile' => 'not_found' + ]); + } + + /** + * @depends testClass + */ + public function testInvalidBootstrapFile() + { + $this->expectException(TasksException::class); + new ShellExecutor([ + 'bootstrapFile' => 'not_found', + 'workerFile' => dirname(__DIR__, 2) . DIRECTORY_SEPARATOR . 'bin' . DIRECTORY_SEPARATOR . 'worker' + ]); + } + + /* ---------------------------------- Writing and reading tasks ----------------------- */ + + /** + * @depends testClass + * @covers ::startTask + * @covers ::getTaskRunning + */ + public function testStartAndReadTasks() + { + // First we create a dummy task + $dummyTask = new Task('testStartAndReadTasks', new TestStartAndReadTasksHandler()); + + // Then we write this task to the TaskStorage + $this->taskStorage->addTask($dummyTask); + + // Assert that no PID exists yet + $this->assertNull($dummyTask->attribute('pid')); + + // Then we fire the task + $task = $this->executor->startTask($dummyTask); + + // Pause 1/10th of a second + usleep(500000); + + // Assert that the output is the same + $this->assertSame($dummyTask, $task); + + // Also assert that a PID has been added + $this->assertIsInt($task->attribute('pid')); + + // Also assert that the task is currently running + $this->assertTrue($this->executor->getTaskRunning($task)); + } + + /** + * @depends testStartAndReadTasks + */ + public function testGetStats() + { + // First we create a dummy task, using the previous handler since nothing changes + $dummyTask = new Task('testGetStats', new TestStartAndReadTasksHandler()); + + // Then we write this task to the TaskStorage + $this->taskStorage->addTask($dummyTask); + + // Then we start the task + $dummyTask = $this->executor->startTask($dummyTask); + + // Pause 1/10th of a second + usleep(500000); + + // And we fetch some task statistics + $stats = $this->executor->getTaskStats($dummyTask); + + // Assert some assumptions + $this->assertNotNull($stats); + $this->assertIsInt($stats['pid']); + $this->assertIsFloat($stats['cpu']); + $this->assertIsFloat($stats['mem']); + $this->assertIsString($stats['state']); + $this->assertIsString($stats['start']); + } + + /** + * @depends testGetStats + */ + public function testGetStatsNotExist() + { + // First we create a dummy task, using the previous handler since nothing changes + $dummyTask = new Task('testGetStatsNotExist', new EmptyHandler()); + + // And add a fake PID, since otherwise it will immediately fail + $dummyTask->addAttribute('pid', 1005); + + // Then we fetch the process details + $this->assertNull($this->executor->getTaskStats($dummyTask)); + } + + /** + * @depends testStartAndReadTasks + */ + public function testStopTask() + { + // First we create a dummy task + $dummyTask = new Task('testStopTask', new TestStopTaskHandler()); + + // Then we write this task to the TaskStorage + $this->taskStorage->addTask($dummyTask); + + // First we start the task and confirm its running + $dummyTask = $this->executor->startTask($dummyTask); + + // Pause 1/10th of a second + usleep(500000); + + // Check if the task is running + $this->assertTrue($this->executor->getTaskRunning($dummyTask)); + + // But then we try and stop it + $output = $this->executor->stopTask($dummyTask); + + // Pause 1/10th of a second + usleep(500000); + + // We check that the output actually is the task + $this->assertSame($dummyTask, $output); + + // And check if the Task has actually stopped now + $this->assertFalse($this->executor->getTaskRunning($dummyTask)); + } + +} diff --git a/test/base/ShellWorkerTest.php b/test/base/ShellWorkerTest.php new file mode 100644 index 0000000..c56cb40 --- /dev/null +++ b/test/base/ShellWorkerTest.php @@ -0,0 +1,239 @@ +taskStorage = new TaskStorage\DummyTaskStorage([]); + $this->shellWorker = new ShellWorker($this->taskStorage); + $this->taskStorage->reset(); + + // Clear events + Events::$listeners = []; + } + + public function testClass() + { + $this->assertInstanceOf(ShellWorker::class, $this->shellWorker); + } + + /* ---------------------------------- Writing and reading tasks ----------------------- */ + + // @todo Add lots of tests and amend ShellWorker to return results + + /** + * @depends testClass + */ + public function testUseHandler() + { + // First prepare a Mock Handler + $mockHandler = $this->createMock(Handler::class); + $mockHandler->expects($this->exactly(2))->method('getParentHandler')->willReturn(null); + $mockHandler->expects($this->once()) + ->method('primaryHandler') + ->with($this->callback(function($subject){return $subject instanceof Task;})) + ->willReturn(true); + $mockHandler->expects($this->once()) + ->method('postHandler') + ->with($this->callback(function($subject){return $subject instanceof Task;})) + ->willReturn(true); + $mockHandler->expects($this->once())->method('getOutput')->willReturn('Some Output!'); + $mockHandler->expects($this->once())->method('getPostOutput')->willReturn('Post Output!'); + + // Create a Dummy task + $dummyTask = new Task('testUseHandler', $mockHandler); + $this->taskStorage->addTask($dummyTask); + + // Run the task in ShellWorker + $this->shellWorker->run($dummyTask, false); + + // And verify if the Output is correctly set + $output = $this->taskStorage->readTaskOutput($dummyTask, 1); + $this->assertEquals('Some Output!', $output['output']); + $this->assertEquals(Task::SUCCESS, $output['statusCode']); + + // And run the post handler + $this->shellWorker->run($dummyTask, true); + $output = $this->taskStorage->readPostOutput($dummyTask, 1); + $this->assertEquals('Post Output!', $output['output']); + $this->assertEquals(Task::SUCCESS, $output['statusCode']); + } + + /** + * @depends testUseHandler + */ + public function testFailingHandlers() + { + $mockHandler = $this->createMock(Handler::class); + $mockHandler->expects($this->once()) + ->method('primaryHandler') + ->with($this->callback(function($subject){return $subject instanceof Task;})) + ->willReturn(false); + $mockHandler->expects($this->once()) + ->method('postHandler') + ->with($this->callback(function($subject){return $subject instanceof Task;})) + ->willReturn(false); + + // Create a Dummy task + $dummyTask = new Task('testFailingHandlers1', $mockHandler); + $this->taskStorage->addTask($dummyTask); + + // Run the task in ShellWorker + $this->shellWorker->run($dummyTask, false); + + // And verify if the Output is correctly set + $output = $this->taskStorage->readTaskOutput($dummyTask, 1); + $this->assertEquals('', $output['output']); + $this->assertEquals(Task::FAILED, $output['statusCode']); + + // And run a post failure + $this->shellWorker->run($dummyTask, true); + $output = $this->taskStorage->readPostOutput($dummyTask, 1); + $this->assertEquals('', $output['output']); + $this->assertEquals(Task::FAILED, $output['statusCode']); + } + + /** + * @depends testUseHandler + */ + public function testParentHandlers() + { + // First create the Handlers + $parentHandler = $this->createMock(Handler::class); + $childHandler = $this->createMock(Handler::class); + + // Prepare parent handler output + $parentHandler->expects($this->once()) + ->method('primaryHandler') + ->with($this->callback(function($subject){return $subject instanceof Task;})) + ->willReturn(true); + $parentHandler->expects($this->once()) + ->method('getOutput') + ->willReturn('Parent Output'); + + // Prepare the child handler + $childHandler->expects($this->once()) + ->method('getParentHandler') + ->willReturn($parentHandler); + $childHandler->expects($this->once()) + ->method('setParentInput') + ->with($this->equalTo('Parent Output')); + $childHandler->expects($this->once()) + ->method('primaryHandler') + ->with($this->callback(function($subject){return $subject instanceof Task;})) + ->willReturn(true); + $childHandler->expects($this->once()) + ->method('getOutput') + ->willReturn('Child Output'); + + // Set the relation + $childHandler->setParentHandler($parentHandler); + + // Create the Dummy Task + $dummyTask = new Task('testParentHandlers', $childHandler); + $this->taskStorage->addTask($dummyTask); + + // Run the task in ShellWorker + $this->shellWorker->run($dummyTask, false); + + // And verify if the Output is correctly set + $output = $this->taskStorage->readTaskOutput($dummyTask, 1); + $this->assertEquals('Child Output', $output['output']); + $this->assertEquals(Task::SUCCESS, $output['statusCode']); + } + + /** + * @depends testParentHandlers + */ + public function testCascadingParentFailure() + { + // First create the Handlers + $parentHandler = $this->createMock(Handler::class); + $childHandler = $this->createMock(Handler::class); + + // Set the relation + $childHandler->setParentHandler($parentHandler); + $childHandler->expects($this->once()) + ->method('getParentHandler') + ->willReturn($parentHandler); + + // Set the results + $parentHandler->expects($this->once()) + ->method('primaryHandler') + ->willReturn(false); + $childHandler->expects($this->never()) + ->method('primaryHandler') + ->willReturn(true); + + // And some methods which shall be called in the end + $childHandler->expects($this->once()) + ->method('getOutput') + ->willReturn('Task failed successfully'); + + // Create the task to run this + $dummyTask = new Task('testCascadingParentFailure', $childHandler); + $this->taskStorage->addTask($dummyTask); + + // Run the task in ShellWorker + $this->shellWorker->run($dummyTask, false); + + // And verify whether the task has indeed failed + $output = $this->taskStorage->readTaskOutput($dummyTask, 1); + $this->assertEquals('Task failed successfully', $output['output']); + $this->assertEquals(Task::FAILED, $output['statusCode']); + } +} diff --git a/test/base/TaskStorageTest.php b/test/base/TaskStorageTest.php new file mode 100644 index 0000000..d3e4b6d --- /dev/null +++ b/test/base/TaskStorageTest.php @@ -0,0 +1,577 @@ +taskStorage = $tasks->getTaskStorage(); + $this->taskStorage->reset(); + + // Reset events + Events::$listeners = []; + } + + public function testDummyTaskStorageClass() + { + $this->assertInstanceOf('FuzeWorks\Async\TaskStorage', $this->taskStorage); + } + + /* ---------------------------------- Writing and reading tasks ----------------------- */ + + /** + * @depends testDummyTaskStorageClass + */ + public function testAddAndReadTasks() + { + // Prepare a dummy task + $dummyTask = new Task('testAddTask', new EmptyHandler()); + + // Nothing is written yet so it should be empty + $this->assertEmpty($this->taskStorage->readTasks()); + + // Write task to storage and test properties of readTasks + $this->assertTrue($this->taskStorage->addTask($dummyTask)); + $output = $this->taskStorage->readTasks(); + $this->assertCount(1, $output); + + // Get first + $task = $output[0]; + $this->assertEquals($dummyTask, $task); + + // Test if the properties match + $this->assertEquals('testAddTask', $task->getId()); + $this->assertInstanceOf(EmptyHandler::class, $task->getHandler()); + } + + /** + * @depends testAddAndReadTasks + */ + public function testReadUnfinishedTasks() + { + // Add the tasks + $finishedTask = new Task('finishedTask', new EmptyHandler()); + $finishedTask->setStatus(Task::COMPLETED); + $unfinishedTask1 = new Task('unfinishedTask1', new EmptyHandler()); + $unfinishedTask2 = new Task('unfinishedTask2', new EmptyHandler()); + + // Nothing is written yet so it should be empty + $this->assertEmpty($this->taskStorage->readTasks()); + + // Write the tasks to TaskStorage + $this->taskStorage->addTask($finishedTask); + $this->taskStorage->addTask($unfinishedTask1); + $this->taskStorage->addTask($unfinishedTask2); + + // And check whether they get properly read + $this->assertCount(3, $this->taskStorage->readTasks()); + + // And whether the finished task gets omitted in the unfinished list + $this->assertCount(2, $this->taskStorage->readTasks(true)); + } + + /** + * @depends testAddAndReadTasks + */ + public function testAddExistingTask() + { + // Prepare a dummy task + $dummyTask = new Task('testAddExistingTask', new EmptyHandler()); + + // First check that the task storage starts empty + $this->assertEmpty($this->taskStorage->readTasks()); + + // Then add the first task + $this->assertTrue($this->taskStorage->addTask($dummyTask)); + + // But then add another task, which should raise an exception + $this->expectException(TasksException::class); + $this->taskStorage->addTask($dummyTask); + } + + /** + * @depends testAddAndReadTasks + */ + public function testGetTaskById() + { + // Prepare a dummy task + $dummyTask1 = new Task('testGetTaskById1', new EmptyHandler()); + $dummyTask2 = new Task('testGetTaskById2', new EmptyHandler()); + + // First we add both tasks + $this->assertEmpty($this->taskStorage->readTasks()); + $this->assertTrue($this->taskStorage->addTask($dummyTask1)); + $this->assertTrue($this->taskStorage->addTask($dummyTask2)); + + // Afterwards, we attempt to get the separate tasks + $retrievedTask1 = $this->taskStorage->getTaskById('testGetTaskById1'); + $retrievedTask2 = $this->taskStorage->getTaskById('testGetTaskById2'); + $this->assertInstanceOf('FuzeWorks\Async\Task', $retrievedTask1); + $this->assertInstanceOf('FuzeWorks\Async\Task', $retrievedTask2); + + // Assert they have the values we seek + $this->assertEquals('testGetTaskById1', $retrievedTask1->getId()); + $this->assertEquals('testGetTaskById2', $retrievedTask2->getId()); + + // Test they are not the same + $this->assertNotEquals($retrievedTask1, $retrievedTask2); + + // And test they are the initial dummy tasks + $this->assertEquals($dummyTask1, $retrievedTask1); + $this->assertEquals($dummyTask2, $retrievedTask2); + } + + /** + * @depends testGetTaskById + */ + public function testGetTaskByIdNotFound() + { + // Prepare a dummy task + $dummyTask = new Task('testGetTaskByIdNotFound', new EmptyHandler()); + + // First we add the task + $this->assertEmpty($this->taskStorage->readTasks()); + $this->assertTrue($this->taskStorage->addTask($dummyTask)); + + // Afterwards we check if we can get this task + $this->assertInstanceOf('FuzeWorks\Async\Task', $this->taskStorage->getTaskById('testGetTaskByIdNotFound')); + + // And afterwards we check if an exception is raised if none exist + $this->expectException(TasksException::class); + $this->taskStorage->getTaskById('DoesNotExist'); + } + + /** + * @depends testGetTaskById + */ + public function testModifyTask() + { + // Prepare a dummy task + $dummyTask = new Task('testModifyTask', new EmptyHandler()); + $dummyTask->setStatus(Task::RUNNING); + + // First we add the task + $this->assertEmpty($this->taskStorage->readTasks()); + $this->assertTrue($this->taskStorage->addTask($dummyTask)); + + // Afterwards we check if this task has the known details + $this->assertEquals(Task::RUNNING, $this->taskStorage->getTaskById('testModifyTask')->getStatus()); + + // Then we change the task + $dummyTask->setStatus(Task::FAILED); + $this->assertTrue($this->taskStorage->modifyTask($dummyTask)); + + // And check if the details have been changed + $this->assertEquals(Task::FAILED, $this->taskStorage->getTaskById('testModifyTask')->getStatus()); + } + + /** + * @depends testModifyTask + */ + public function testModifyTaskNotFound() + { + // Prepare a dummy task + $dummyTask = new Task('testModifyTaskNotFound', new EmptyHandler()); + + // Attempt to change this task, which does not exist. + $this->expectException(TasksException::class); + $this->taskStorage->modifyTask($dummyTask); + } + + /** + * @depends testModifyTask + */ + public function testModifyTaskEvent() + { + // Prepare a dummy task + $dummyTask = new Task('testModifyTaskEvent', new EmptyHandler()); + $dummyTask->setStatus(Task::PENDING); + + // Then add the Task + $this->taskStorage->addTask($dummyTask); + + // Now prepare a listener to be fired + Events::addListener(function (TaskModifyEvent $event){ + $task = $event->getTask(); + $this->assertEquals(Task::PENDING, $task->getStatus()); + $task->setStatus(Task::RUNNING); + $event->updateTask($task); + return $event; + }, 'TaskModifyEvent', Priority::NORMAL); + + // Now update the task + $this->assertEquals(Task::PENDING, $dummyTask->getStatus()); + $this->taskStorage->modifyTask($dummyTask); + + // And check whether dummyTask got modified + $this->assertEquals(Task::RUNNING, $dummyTask->getStatus()); + + // And check whether the TaskStorage has this updated version as well + $modifiedTask = $this->taskStorage->getTaskById($dummyTask->getId()); + $this->assertEquals(Task::RUNNING, $modifiedTask->getStatus()); + } + + /** + * @depends testModifyTaskEvent + */ + public function testModifyTaskCancel() + { + // Prepare a dummy task + $dummyTask = new Task('testModifyTaskCancel', new EmptyHandler()); + $dummyTask->setStatus(Task::PENDING); + + // Then add the Task + $this->taskStorage->addTask($dummyTask); + + // Now prepare a listener to be fired + Events::addListener(function (TaskModifyEvent $event){ + $event->setCancelled(true); + return $event; + }, 'TaskModifyEvent', Priority::NORMAL); + + // Modify the task + $dummyTask->setStatus(Task::SUCCESS); + $this->assertFalse($this->taskStorage->modifyTask($dummyTask)); + + // And check that the task actually hasn't updated + $modifiedTask = $this->taskStorage->getTaskById($dummyTask->getId()); + $this->assertEquals(Task::PENDING, $modifiedTask->getStatus()); + } + + /** + * @depends testGetTaskById + */ + public function testDeleteTask() + { + // Prepare a dummy task + $dummyTask = new Task('testDeleteTask', new EmptyHandler()); + + // Add the task to the storage + $this->assertEmpty($this->taskStorage->readTasks()); + $this->assertTrue($this->taskStorage->addTask($dummyTask)); + + // Test that it exists + $this->assertEquals($dummyTask, $this->taskStorage->getTaskById('testDeleteTask')); + + // Then remove the task + $this->assertTrue($this->taskStorage->deleteTask($dummyTask)); + + // And test that it can't be found + $this->expectException(TasksException::class); + $this->taskStorage->getTaskById('testDeleteTask'); + } + + /** + * @depends testDeleteTask + */ + public function testDeleteTaskNotFound() + { + // Prepare a dummy task + $dummyTask = new Task('testDeleteTaskNotFound', new EmptyHandler()); + + // Attempt to delete this task, which does not exist. + $this->expectException(TasksException::class); + $this->taskStorage->deleteTask($dummyTask); + } + + /* ---------------------------------- Writing and reading task output ----------------- */ + + /** + * @depends testDummyTaskStorageClass + */ + public function testWriteAndReadTaskOutput() + { + // Prepare a dummy task + $dummyTask = new Task('testWriteAndReadTaskOutput', new EmptyHandler()); + + // First write the task output + $this->taskStorage->addTask($dummyTask); + $this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output', 'errors', 0)); + + // Then try to read the output + $output = $this->taskStorage->readTaskOutput($dummyTask, 1); + $this->assertEquals('output', $output['output']); + $this->assertEquals('errors', $output['errors']); + $this->assertEquals(0, $output['statusCode']); + } + + /** + * @depends testWriteAndReadTaskOutput + */ + public function testWriteAndReadMultipleOutput() + { + // Prepare a dummy task + $dummyTask = new Task('testWriteAndReadMultipleOutput', new EmptyHandler()); + + // Write some task output + $this->taskStorage->addTask($dummyTask); + $this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output1', 'errors1', 0)); + $this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output2', 'errors2', 0)); + + // Then try to read all the output + $output = $this->taskStorage->readTaskOutput($dummyTask, -1); + $this->assertEquals([ + ['output' => 'output1', 'errors' => 'errors1', 'statusCode' => 0], + ['output' => 'output2', 'errors' => 'errors2', 'statusCode' => 0] + ], $output); + + // Then try and read the latest + $this->assertEquals(['output' => 'output2', 'errors' => 'errors2', 'statusCode' => 0], $this->taskStorage->readTaskOutput($dummyTask)); + } + + /** + * @depends testWriteAndReadTaskOutput + */ + public function testWriteAndReadTaskOutputTaskNotExist() + { + // Prepare a dummy task + $dummyTask = new Task('testWriteAndReadTaskOutputTaskNotExist', new EmptyHandler()); + + // Write output while the task does not exist yet, expect exception + $this->expectException(TasksException::class); + $this->taskStorage->writeTaskOutput($dummyTask, 'output', 'errors', 0); + } + + /** + * @depends testWriteAndReadTaskOutput + */ + public function testWriteAndReadTaskOutputAttempts() + { + // Prepare a dummy task + $dummyTask = new Task('testWriteAndReadTaskOutputAttempts', new EmptyHandler()); + $this->taskStorage->addTask($dummyTask); + + // Write the different outputs. + $this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output0', 'errors0', 100)); + $this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output1', 'errors1', 101)); + $this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output2', 'errors2', 102)); + + // Attempt to load the first output + $output0 = $this->taskStorage->readTaskOutput($dummyTask, 1); + $this->assertEquals('output0', $output0['output']); + $this->assertEquals('errors0', $output0['errors']); + $this->assertEquals(100, $output0['statusCode']); + + // Attempt to load the second output + $output1 = $this->taskStorage->readTaskOutput($dummyTask, 2); + $this->assertEquals('output1', $output1['output']); + $this->assertEquals('errors1', $output1['errors']); + $this->assertEquals(101, $output1['statusCode']); + + // Attempt to load the third output + $output2 = $this->taskStorage->readTaskOutput($dummyTask, 3); + $this->assertEquals('output2', $output2['output']); + $this->assertEquals('errors2', $output2['errors']); + $this->assertEquals(102, $output2['statusCode']); + + // Attempt to load the default output + $output = $this->taskStorage->readTaskOutput($dummyTask); + $this->assertEquals('output2', $output['output']); + $this->assertEquals('errors2', $output['errors']); + $this->assertEquals(102, $output['statusCode']); + + // And to load all + $this->assertEquals([ + ['output' => 'output0', 'errors' => 'errors0', 'statusCode' => 100], + ['output' => 'output1', 'errors' => 'errors1', 'statusCode' => 101], + ['output' => 'output2', 'errors' => 'errors2', 'statusCode' => 102] + ], $this->taskStorage->readTaskOutput($dummyTask, -1)); + } + + /** + * @depends testWriteAndReadTaskOutput + */ + public function testWriteAndReadTaskOutputNotExist() + { + // Prepare a dummy task + $dummyTask = new Task('testWriteAndReadTaskOutputNotExist', new EmptyHandler()); + $this->taskStorage->addTask($dummyTask); + + $this->assertNull($this->taskStorage->readTaskOutput($dummyTask)); + } + + /* ---------------------------------- Writing and reading task post output ------------ */ + + /** + * @depends testDummyTaskStorageClass + */ + public function testWriteAndReadTaskPostOutput() + { + // Prepare a dummy task + $dummyTask = new Task('testWriteAndReadTaskPostOutput', new EmptyHandler()); + $this->taskStorage->addTask($dummyTask); + + // First write the task output + $this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'postOutput', 'errors', 0)); + + // Then try to read the output + $output = $this->taskStorage->readPostOutput($dummyTask, 1); + $this->assertEquals('postOutput', $output['output']); + $this->assertEquals('errors', $output['errors']); + $this->assertEquals(0, $output['statusCode']); + } + + /** + * @depends testWriteAndReadTaskOutput + */ + public function testWriteAndReadMultiplePostOutput() + { + // Prepare a dummy task + $dummyTask = new Task('testWriteAndReadMultiplePostOutput', new EmptyHandler()); + + // Write some task output + $this->taskStorage->addTask($dummyTask); + $this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'output1', 'errors1', 0)); + $this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'output2', 'errors2', 0)); + + // Then try to read all the output + $output = $this->taskStorage->readPostOutput($dummyTask, -1); + $this->assertEquals([ + ['output' => 'output1', 'errors' => 'errors1', 'statusCode' => 0], + ['output' => 'output2', 'errors' => 'errors2', 'statusCode' => 0] + ], $output); + + $this->assertEquals(['output' => 'output2', 'errors' => 'errors2', 'statusCode' => 0], $this->taskStorage->readPostOutput($dummyTask)); + } + + /** + * @depends testWriteAndReadTaskPostOutput + */ + public function testWriteAndReadTaskPostOutputAttempts() + { + // Prepare a dummy task + $dummyTask = new Task('testWriteAndReadTaskPostOutputAttempts', new EmptyHandler()); + $this->taskStorage->addTask($dummyTask); + + // Write the different outputs + $this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'output0', 'errors0', 100)); + $this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'output1', 'errors1', 101)); + $this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'output2', 'errors2', 102)); + + + // Attempt to load the first output + $output0 = $this->taskStorage->readPostOutput($dummyTask, 1); + $this->assertEquals('output0', $output0['output']); + $this->assertEquals('errors0', $output0['errors']); + $this->assertEquals(100, $output0['statusCode']); + + // Attempt to load the second output + $output1 = $this->taskStorage->readPostOutput($dummyTask, 2); + $this->assertEquals('output1', $output1['output']); + $this->assertEquals('errors1', $output1['errors']); + $this->assertEquals(101, $output1['statusCode']); + + // Attempt to load the third output + $output2 = $this->taskStorage->readPostOutput($dummyTask, 3); + $this->assertEquals('output2', $output2['output']); + $this->assertEquals('errors2', $output2['errors']); + $this->assertEquals(102, $output2['statusCode']); + + // Attempt to load the default output + $output = $this->taskStorage->readPostOutput($dummyTask); + $this->assertEquals('output2', $output['output']); + $this->assertEquals('errors2', $output['errors']); + $this->assertEquals(102, $output['statusCode']); + + // And to load all + $this->assertEquals([ + ['output' => 'output0', 'errors' => 'errors0', 'statusCode' => 100], + ['output' => 'output1', 'errors' => 'errors1', 'statusCode' => 101], + ['output' => 'output2', 'errors' => 'errors2', 'statusCode' => 102] + ], $this->taskStorage->readPostOutput($dummyTask, -1)); + } + + /** + * @depends testWriteAndReadTaskPostOutput + */ + public function testWriteAndReadTaskPostOutputNotExist() + { + // Prepare a dummy task + $dummyTask = new Task('testWriteAndReadTaskPostOutputNotExist', new EmptyHandler()); + $this->taskStorage->addTask($dummyTask); + + $this->assertNull($this->taskStorage->readPostOutput($dummyTask)); + } + + /* ---------------------------------- Data persistence and resets --------- ------------ */ + + /** + * @depends testAddAndReadTasks + * @depends testWriteAndReadTaskOutput + * @depends testWriteAndReadTaskPostOutput + */ + public function testReset() + { + // Prepare a dummy task + $dummyTask = new Task('testReset', new EmptyHandler()); + + // Add the task and some output + $this->assertTrue($this->taskStorage->addTask($dummyTask)); + $this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output', 'errors', 100)); + $this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'postOutput', 'errors', 100)); + + // Then reset the data + $this->assertTrue($this->taskStorage->reset()); + + // And test if the data is actually gone + $this->assertNull($this->taskStorage->readTaskOutput($dummyTask)); + $this->assertNull($this->taskStorage->readPostOutput($dummyTask)); + $this->expectException(TasksException::class); + $this->taskStorage->getTaskById('testReset'); + } + +} diff --git a/test/base/TaskTest.php b/test/base/TaskTest.php new file mode 100644 index 0000000..4f21316 --- /dev/null +++ b/test/base/TaskTest.php @@ -0,0 +1,269 @@ +assertInstanceOf('FuzeWorks\Async\Task', $dummyTask); + } + + /* ---------------------------------- Basic variables tests --------------------------- */ + + /** + * @depends testClass + */ + public function testBaseVariables() + { + // Create dummy task + $dummyTask = new Task('testBaseVariables', new EmptyHandler(), true); + + // test the values + $this->assertEquals('testBaseVariables', $dummyTask->getId()); + $this->assertInstanceOf(EmptyHandler::class, $dummyTask->getHandler()); + $this->assertTrue($dummyTask->getUsePostHandler()); + } + + /** + * @depends testBaseVariables + */ + public function testArguments() + { + // Create task without arguments + $dummyTask1 = new Task('testArguments1', new EmptyHandler(), true); + $this->assertEmpty($dummyTask1->getArguments()); + + // Now create a task with some arguments + $dummyTask2 = new Task('testArguments2', new EmptyHandler(), true, 'some', 'arguments'); + $this->assertEquals(['some', 'arguments'], $dummyTask2->getArguments()); + } + + /** + * @depends testBaseVariables + */ + public function testPostHandler() + { + // Create dummy tasks + $dummyTask1 = new Task('testPostHandler1', new EmptyHandler(), true); + $dummyTask2 = new Task('testPostHandler2', new EmptyHandler(), false); + + $this->assertTrue($dummyTask1->getUsePostHandler()); + $this->assertFalse($dummyTask2->getUsePostHandler()); + } + + /** + * @depends testBaseVariables + */ + public function testConstraints() + { + // First create a mock constraint + $stub = $this->createMock(Constraint::class); + + // Then add it to the task + $dummyTask = new Task('testConstraints', new EmptyHandler(), false); + $dummyTask->addConstraint($stub); + + // Assert it exists + $this->assertEquals([$stub], $dummyTask->getConstraints()); + } + + /** + * @depends testBaseVariables + */ + public function testInitHandler() + { + $mockHandler = $this->createMock(Handler::class); + $mockHandler->expects($this->once())->method('init') + ->with($this->callback(function($subject){return $subject instanceof Task;})); + + // Then create a class + new Task('testInitHandler', $mockHandler); + } + + /** + * @depends testBaseVariables + */ + public function testStatusCodes() + { + // Create dummy task + $dummyTask = new Task('testStatusCodes', new EmptyHandler(), true); + + for ($i = 1; $i <= 9; $i++) { + $dummyTask->setStatus($i); + $this->assertEquals($i, $dummyTask->getStatus()); + } + } + + /** + * @depends testBaseVariables + */ + public function testDelayTime() + { + // Create dummy task + $dummyTask = new Task('testDelayTime', new EmptyHandler(), true); + + $this->assertEquals(0, $dummyTask->getDelayTime()); + $dummyTask->setDelayTime(1000); + $this->assertEquals(1000, $dummyTask->getDelayTime()); + } + + /** + * @depends testBaseVariables + */ + public function testAttributes() + { + // Create dummy task + $dummyTask = new Task('testAttributes', new EmptyHandler(), true); + + // First test a non-existing attribute + $this->assertNull($dummyTask->attribute('testKey')); + + // Now add it and test if it is there + $dummyTask->addAttribute('testKey', 'SomeContent'); + $this->assertEquals('SomeContent', $dummyTask->attribute('testKey')); + + // Remove attribute + $dummyTask->removeAttribute('testKey'); + $this->assertNull($dummyTask->attribute('testKey')); + + // Remove attribute not found + $this->expectException(TasksException::class); + $dummyTask->removeAttribute('NotExistant'); + } + + /** + * @depends testBaseVariables + */ + public function testOutputsAndErrors() + { + // Create dummy task + $dummyTask = new Task('testOutputsAndErrors', new EmptyHandler(), true); + + // Check if non are filled + $this->assertNull($dummyTask->getOutput()); + $this->assertNull($dummyTask->getPostOutput()); + $this->assertNull($dummyTask->getErrors()); + $this->assertNull($dummyTask->getPostErrors()); + + // Then write some data to the task + $dummyTask->setOutput('SomeOutput', 'SomeErrors'); + $dummyTask->setPostOutput('SomePostOutput', 'SomePostErrors'); + + // And check again + $this->assertEquals('SomeOutput', $dummyTask->getOutput()); + $this->assertEquals('SomePostOutput', $dummyTask->getPostOutput()); + $this->assertEquals('SomeErrors', $dummyTask->getErrors()); + $this->assertEquals('SomePostErrors', $dummyTask->getPostErrors()); + } + + /** + * @depends testBaseVariables + */ + public function testRetrySettings() + { + // Create dummy task + $dummyTask = new Task('testRetrySettings', new EmptyHandler(), true); + + // Test starting position + $this->assertEquals([ + 'retryOnFail' => false, + 'maxRetries' => 2, + 'retryPFailures' => true, + 'retryRFailures' => true, + 'retryPostFailures' => true, + 'maxTime' => 30 + ], $dummyTask->getSettings()); + + // Then change the settings + $dummyTask->setSettings(true, 30, 60, false, false, false); + + // And test the new positions + $this->assertEquals([ + 'retryOnFail' => true, + 'maxRetries' => 30, + 'retryPFailures' => false, + 'retryRFailures' => false, + 'retryPostFailures' => false, + 'maxTime' => 60 + ], $dummyTask->getSettings()); + } + + /** + * @depends testBaseVariables + */ + public function testRetries() + { + // Create dummy task + $dummyTask = new Task('testRetries', new EmptyHandler(), true); + + // First test the starting position + $this->assertEquals(0, $dummyTask->getRetries()); + + // Then add one and test + $dummyTask->addRetry(); + $this->assertEquals(1, $dummyTask->getRetries()); + + // Then reset it and test + $dummyTask->resetRetries(); + $this->assertEquals(0, $dummyTask->getRetries()); + } + + public function testGetStatusType() + { + $this->assertEquals('Task::PENDING', Task::getStatusType(Task::PENDING)); + $this->assertEquals('Task::RUNNING', Task::getStatusType(Task::RUNNING)); + $this->assertEquals('Task::FAILED', Task::getStatusType(Task::FAILED)); + $this->assertEquals('Task::PFAILED', Task::getStatusType(Task::PFAILED)); + $this->assertEquals('Task::SUCCESS', Task::getStatusType(Task::SUCCESS)); + $this->assertEquals('Task::POST', Task::getStatusType(Task::POST)); + $this->assertEquals('Task::COMPLETED', Task::getStatusType(Task::COMPLETED)); + $this->assertEquals('Task::DELAYED', Task::getStatusType(Task::DELAYED)); + $this->assertEquals('Task::CANCELLED', Task::getStatusType(Task::CANCELLED)); + $this->assertFalse(Task::getStatusType(10)); + } +} diff --git a/config.tasks.php b/test/bootstrap.php similarity index 56% rename from config.tasks.php rename to test/bootstrap.php index 922e272..8d389b4 100644 --- a/config.tasks.php +++ b/test/bootstrap.php @@ -33,39 +33,30 @@ * * @version Version 1.0.0 */ -return array( - 'SuperVisor' => [ - 'type' => 'ParallelSuperVisor', - 'parameters' => [] - ], - 'TaskStorage' => [ - 'type' => 'ArrayTaskStorage', - // For ArrayTaskStorage, first parameter is the file location of the array storage - 'parameters' => [ - 'filename' => dirname(__FILE__) . DS . 'storage.php' - ], +require_once(dirname(__DIR__) . '/vendor/autoload.php'); - // For RedisTaskStorage, parameters are connection properties - #'parameters' => [ - # // Type can be 'tcp' or 'unix' - # 'socket_type' => 'tcp', - # // If socket_type == 'unix', set the socket here - # 'socket' => null, - # // If socket_type == 'tcp', set the host here - # 'host' => 'localhost', - # - # 'password' => null, - # 'port' => 6379, - # 'timeout' => 0 - #] - ], - 'Executor' => [ - 'type' => 'ShellExecutor', +use FuzeWorks\Core; +use FuzeWorks\Priority; - // For ShellExecutor, first parameter is the file location of the worker script - 'parameters' => [ - 'workerFile' => dirname(__FILE__) . DS . 'bin' . DS . 'worker' - ] - ] -); \ No newline at end of file +// Open configurator +$configurator = new FuzeWorks\Configurator(); + +// Set up basic settings +$configurator->setTimeZone('Europe/Amsterdam'); +$configurator->setTempDirectory(dirname(__FILE__) . DIRECTORY_SEPARATOR . 'temp'); +$configurator->setLogDirectory(dirname(__FILE__). DIRECTORY_SEPARATOR . 'temp'); + +$configurator->addComponent(new \FuzeWorks\MVCRComponent()); + +// Add Async library +$configurator->deferComponentClassMethod('libraries', 'addLibraryClass', null, 'async', '\FuzeWorks\Async\Tasks'); + +// Add test directory so that config.tasks.php can be loaded +$configurator->addDirectory(dirname(__FILE__), 'config', Priority::HIGH); + +// Add mock directory for tests and other classes +Core::addAutoloadMap('\Mock', dirname(__FILE__) . DIRECTORY_SEPARATOR . 'mock'); + +// Create container +return $configurator->createContainer(); \ No newline at end of file diff --git a/test/config.tasks.php b/test/config.tasks.php new file mode 100644 index 0000000..e3190f1 --- /dev/null +++ b/test/config.tasks.php @@ -0,0 +1,92 @@ + true, + + // Which SuperVisor should be used, and with what settings + 'SuperVisor' => [ + 'type' => Core::getEnv('SUPERVISOR_TYPE', 'ParallelSuperVisor'), + 'ParallelSuperVisor' => ['parameters' => []] + ], + + // Which TaskStorage should be used, and with what settings + 'TaskStorage' => [ + 'type' => Core::getEnv('TASKSTORAGE_TYPE', 'DummyTaskStorage'), + 'DummyTaskStorage' => ['parameters' => []], + 'ArrayTaskStorage' => [ + 'parameters' => [ + 'filename' => Core::getEnv('TASKSTORAGE_ARRAY_FILE', + dirname(__FILE__) . DS . 'temp'. DS . 'storage.php') + ] + ], + 'RedisTaskStorage' => [ + 'parameters' => [ + // Type can be 'tcp' or 'unix' + 'socket_type' => Core::getEnv('TASKSTORAGE_REDIS_SOCKET_TYPE', 'tcp'), + // If socket_type == 'unix', set the socket here + 'socket' => Core::getEnv('TASKSTORAGE_REDIS_SOCKET', null), + // If socket_type == 'tcp', set the host here + 'host' => Core::getEnv('TASKSTORAGE_REDIS_HOST', '127.0.0.1'), + // And some standard settings + 'password' => Core::getEnv('TASKSTORAGE_REDIS_PASSWORD', null), + 'port' => Core::getEnv('TASKSTORAGE_REDIS_PORT', 6379), + 'timeout' => Core::getEnv('TASKSTORAGE_REDIS_TIMEOUT', 0), + 'db_index' => Core::getEnv('TASKSTORAGE_REDIS_DBINDEX', 0), + ] + ], + ], + + // Which Executor should be used, and with what settings + 'Executor' => [ + 'type' => Core::getEnv('EXECUTOR_TYPE', 'ShellExecutor'), + + 'ShellExecutor' => [ + 'parameters' => [ + 'workerFile' => Core::getEnv('EXECUTOR_SHELL_WORKER', + dirname(__FILE__, 2) . DS . 'bin' . DS . 'worker'), + 'bootstrapFile' => Core::getEnv('EXECUTOR_SHELL_BOOTSTRAP', + dirname(__FILE__) . DS . 'bootstrap.php') + ] + ] + ] +); \ No newline at end of file diff --git a/src/FuzeWorks/Async/Process.php b/test/mock/Controllers/EmptyController.php similarity index 66% rename from src/FuzeWorks/Async/Process.php rename to test/mock/Controllers/EmptyController.php index 728adf7..1793d01 100644 --- a/src/FuzeWorks/Async/Process.php +++ b/test/mock/Controllers/EmptyController.php @@ -34,40 +34,45 @@ * @version Version 1.0.0 */ -namespace FuzeWorks\Async; -class Process +namespace Mock\Controllers; +use FuzeWorks\Async\Task; +use FuzeWorks\Controller; + +class EmptyController extends Controller { - const PENDING = 1; - const RUNNING = 2; - const FAILED = 3; - const FINISHED = 4; + public $input; - /** - * The current status of the process - * - * @var int - */ - protected $status = Process::PENDING; + public $task; + public $param1; + public $param2; - /** - * @var int - */ - protected $pid; + public $postTask; - public function __construct(int $pid) + public function setInput(string $input) { - $this->pid = $pid; + $this->input = $input; } - /** - * Receive the process Id of this process - * - * @return int - */ - public function getPid(): int + public function primary(Task $task, $param1, $param2) { - return $this->pid; + $this->task = $task; + $this->param1 = $param1; + $this->param2 = $param2; + + return "Primary success: " . $param1 . ' + ' . $param2 . ' + ' . $this->input; + } + + public function post(Task $task) + { + $this->postTask = $task; + + return "Post success: " . $task->getId(); + } + + public function getTaskStatus(): bool + { + return true; } } \ No newline at end of file diff --git a/test/mock/Controllers/FailingController.php b/test/mock/Controllers/FailingController.php new file mode 100644 index 0000000..eb8ac04 --- /dev/null +++ b/test/mock/Controllers/FailingController.php @@ -0,0 +1,83 @@ +input = $input; + } + + public function primary(Task $task, $param1, $param2) + { + $this->task = $task; + $this->param1 = $param1; + $this->param2 = $param2; + + // Also log some errors + Logger::logError('Logged some task error!'); + + return "Primary success: " . $param1 . ' + ' . $param2; + } + + public function post(Task $task) + { + $this->postTask = $task; + + // Also log some errors + Logger::logError('Logged some post error!'); + + return "Post success: " . $task->getId(); + } + + public function getTaskStatus(): bool + { + return false; + } +} \ No newline at end of file diff --git a/test/mock/Handlers/ArgumentedHandler.php b/test/mock/Handlers/ArgumentedHandler.php new file mode 100644 index 0000000..3b0ae46 --- /dev/null +++ b/test/mock/Handlers/ArgumentedHandler.php @@ -0,0 +1,116 @@ +sleepTime = $sleepTime; + $this->output = $output; + } + + /** + * @inheritDoc + */ + public function init(Task $task) + { + } + + /** + * @inheritDoc + */ + public function primaryHandler(Task $task): bool + { + sleep($this->sleepTime); + return true; + } + + /** + * @inheritDoc + */ + public function getOutput(): string + { + return $this->output; + } + + /** + * @inheritDoc + */ + public function postHandler(Task $task) + { + sleep($this->sleepTime); + return true; + } + + /** + * @inheritDoc + */ + public function getPostOutput(): string + { + return $this->output; + } + + /** + * @inheritDoc + */ + public function getParentHandler(): ?Handler + { + return null; + } + + /** + * @inheritDoc + */ + public function setParentInput(string $input): void + { + } + + /** + * @inheritDoc + */ + public function setParentHandler(Handler $parentHandler): void + { + } +} \ No newline at end of file diff --git a/test/mock/Handlers/EmptyHandler.php b/test/mock/Handlers/EmptyHandler.php new file mode 100644 index 0000000..30c5636 --- /dev/null +++ b/test/mock/Handlers/EmptyHandler.php @@ -0,0 +1,101 @@ + + + + + ./base/ + ./system/ + + + + + + ../ + + ../vendor/ + ../test/ + ../src/Layout/ + ../src/Config/ + + + + \ No newline at end of file diff --git a/test/system/ParallelSuperVisorTest.php b/test/system/ParallelSuperVisorTest.php new file mode 100644 index 0000000..f069a81 --- /dev/null +++ b/test/system/ParallelSuperVisorTest.php @@ -0,0 +1,509 @@ +taskStorage = $tasks->getTaskStorage(); + $this->taskStorage->reset(); + + // Clear events + Events::$listeners = []; + + // And load the ShellExecutor using the execution settings + $this->executor = new ShellExecutor([ + 'bootstrapFile' => dirname(__DIR__, 1) . DIRECTORY_SEPARATOR . 'bootstrap.php', + 'workerFile' => dirname(__DIR__, 2) . DIRECTORY_SEPARATOR . 'bin' . DIRECTORY_SEPARATOR . 'worker' + ]); + + $this->superVisor = new ParallelSuperVisor($this->taskStorage, $this->executor); + } + + public function testClass() + { + $this->assertInstanceOf('FuzeWorks\Async\Supervisors\ParallelSuperVisor', $this->superVisor); + } + + /* ---------------------------------- Writing and reading tasks ----------------------- */ + + /** + * @depends testClass + */ + public function testEmptyCycle() + { + $this->assertEquals(SuperVisor::FINISHED, $this->superVisor->cycle()); + } + + public function testToRunning() + { + // First create a dummy task + $dummyTask = new Task('testToRunning', new ArgumentedHandler(10, 'Some Output'), false); + + // Write the dummy to TaskStorage + $this->taskStorage->addTask($dummyTask); + + // Assert that the status is PENDING and not running + $this->assertEquals(Task::PENDING, $dummyTask->getStatus()); + $this->assertFalse($this->executor->getTaskRunning($dummyTask)); + + // Then cycle the SuperVisor + $this->superVisor->cycle(); + + // Pause 1/10th of a second + usleep(500000); + + // Then re-fetch the Task + $dummyTask = $this->taskStorage->getTaskById($dummyTask->getId()); + + // And check that it is running for real + $this->assertEquals(Task::RUNNING, $dummyTask->getStatus()); + $this->assertTrue($this->executor->getTaskRunning($dummyTask)); + } + + /** + * @depends testToRunning + */ + public function testConstrainedPending() + { + // First create a dummy task + $dummyTask = new Task('testConstrainedPending', new ArgumentedHandler(10, 'Some Output'), false); + + // Add a constraint + $dummyTask->addConstraint(new FixedTimeConstraint(time() + 3600)); + + // Write the dummy to TaskStorage + $this->taskStorage->addTask($dummyTask); + + // Assert that the status is PENDING and not running + $this->assertEquals(Task::PENDING, $dummyTask->getStatus()); + $this->assertFalse($this->executor->getTaskRunning($dummyTask)); + + // Then cycle the SuperVisor + $this->superVisor->cycle(); + + // Pause 1/10th of a second + usleep(500000); + + // Then re-fetch the Task + $dummyTask = $this->taskStorage->getTaskById($dummyTask->getId()); + + // And check that it is delayed + $this->assertEquals(Task::DELAYED, $dummyTask->getStatus()); + $this->assertFalse($this->executor->getTaskRunning($dummyTask)); + } + + /** + * @depends testToRunning + */ + public function testChangeDelayedToPending() + { + // First create a dummy task + $dummyTask = new Task('testChangeDelayedToPending', new ArgumentedHandler(10, 'Some Output'), false); + + // Set to delayed and set to NOW + $dummyTask->setStatus(Task::DELAYED); + $dummyTask->setDelayTime(time() - 3600); + + // Write the dummy to TaskStorage + $this->taskStorage->addTask($dummyTask); + + // Assert that the status is DELAYED and not running + $this->assertEquals(Task::DELAYED, $dummyTask->getStatus()); + $this->assertFalse($this->executor->getTaskRunning($dummyTask)); + + // Then cycle the SuperVisor + $this->superVisor->cycle(); + + // Pause 1/10th of a second + usleep(500000); + + // Then re-fetch the Task + $dummyTask = $this->taskStorage->getTaskById($dummyTask->getId()); + + // And check that it is delayed + $this->assertEquals(Task::PENDING, $dummyTask->getStatus()); + $this->assertFalse($this->executor->getTaskRunning($dummyTask)); + } + + /** + * @depends testToRunning + */ + public function testKeepDelayed() + { + // First create a dummy task + $dummyTask = new Task('testKeepDelayed', new ArgumentedHandler(10, 'Some Output'), false); + + // Set to delayed and set to NOW + $dummyTask->setStatus(Task::DELAYED); + $dummyTask->setDelayTime(time() + 3600); + + // Write the dummy to TaskStorage + $this->taskStorage->addTask($dummyTask); + + // Assert that the status is DELAYED and not running + $this->assertEquals(Task::DELAYED, $dummyTask->getStatus()); + $this->assertFalse($this->executor->getTaskRunning($dummyTask)); + + // Then cycle the SuperVisor + $this->superVisor->cycle(); + + // Pause 1/10th of a second + usleep(500000); + + // Then re-fetch the Task + $dummyTask = $this->taskStorage->getTaskById($dummyTask->getId()); + + // And check that it is delayed + $this->assertEquals(Task::DELAYED, $dummyTask->getStatus()); + $this->assertFalse($this->executor->getTaskRunning($dummyTask)); + } + + /** + * @depends testToRunning + */ + public function testFinishedTask() + { + // First create a dummy task + $dummyTask = new Task('testFinishedTask', new ArgumentedHandler(10, 'Some Output'), false); + + // Set status to running + $dummyTask->setStatus(Task::RUNNING); + $dummyTask->addAttribute('pid', 1005); + + // Write the dummy and some output to TaskStorage + $this->taskStorage->addTask($dummyTask); + $this->taskStorage->writeTaskOutput($dummyTask, 'Some Output', '', Task::SUCCESS); + + // Test if everything is set + $this->assertEquals(Task::RUNNING, $dummyTask->getStatus()); + + // Then cycle the SuperVisor + $this->superVisor->cycle(); + + // Pause 1/10th of a second + usleep(500000); + + // Then re-fetch the Task + $dummyTask = $this->taskStorage->getTaskById($dummyTask->getId()); + + // And check that it is finished indeed + $this->assertEquals(Task::SUCCESS, $dummyTask->getStatus()); + $this->assertFalse($this->executor->getTaskRunning($dummyTask)); + $this->assertEquals('Some Output', $dummyTask->getOutput()); + } + + /** + * @depends testFinishedTask + */ + public function testMissingTask() + { + // First create a dummy task + $dummyTask = new Task('testMissingTask', new ArgumentedHandler(10, 'Some Output'), false); + + // Set status to running + $dummyTask->setStatus(Task::RUNNING); + $dummyTask->addAttribute('pid', 1006); + + // Write the dummy and no output to TaskStorage + $this->taskStorage->addTask($dummyTask); + + // Test if everything is set + $this->assertEquals(Task::RUNNING, $dummyTask->getStatus()); + + // Then cycle the SuperVisor + $this->superVisor->cycle(); + + // Pause 1/10th of a second + usleep(500000); + + // Then re-fetch the Task + $dummyTask = $this->taskStorage->getTaskById($dummyTask->getId()); + + // And check that it has failed indeed + $this->assertEquals(Task::PFAILED, $dummyTask->getStatus()); + $this->assertFalse($this->executor->getTaskRunning($dummyTask)); + } + + /** + * @depends testFinishedTask + */ + public function testFailedTask() + { + // First create a dummy task + $dummyTask = new Task('testFailedTask', new ArgumentedHandler(10, 'Some Output'), false); + + // Set status to running + $dummyTask->setStatus(Task::RUNNING); + $dummyTask->addAttribute('pid', 1007); + + // Write the dummy and some output to TaskStorage + $this->taskStorage->addTask($dummyTask); + $this->taskStorage->writeTaskOutput($dummyTask, 'Some Output', 'Some Errors', Task::FAILED); + + // Test if everything is set + $this->assertEquals(Task::RUNNING, $dummyTask->getStatus()); + + // Then cycle the SuperVisor + $this->superVisor->cycle(); + + // Pause 1/10th of a second + usleep(500000); + + // Then re-fetch the Task + $dummyTask = $this->taskStorage->getTaskById($dummyTask->getId()); + + // And check that it is finished indeed + $this->assertEquals(Task::FAILED, $dummyTask->getStatus()); + $this->assertFalse($this->executor->getTaskRunning($dummyTask)); + $this->assertEquals('Some Errors', $dummyTask->getErrors()); + } + + /** + * @depends testFailedTask + */ + public function testRetryFailedTask() + { + // First create the dummy tasks + $dummyTaskFailedYes = new Task('testRetryFailedTaskY', new ArgumentedHandler(10, 'Some Output'), false); + $dummyTaskPFailedYes = new Task('testRetryFailedTaskPY', new ArgumentedHandler(10, 'Some Output'), false); + $dummyTaskFailedNo = new Task('testRetryFailedTaskN', new ArgumentedHandler(10, 'Some Output'), false); + $dummyTaskPFailedNo = new Task('testRetryFailedTaskPN', new ArgumentedHandler(10, 'Some Output'), false); + + // Set statuses + $dummyTaskFailedYes->setStatus(Task::FAILED); + $dummyTaskPFailedYes->setStatus(Task::PFAILED); + $dummyTaskFailedNo->setStatus(Task::FAILED); + $dummyTaskPFailedNo->setStatus(Task::FAILED); + + // Set retry settings + $dummyTaskFailedYes->setSettings(true, 5, 30, true, true,true); + $dummyTaskPFailedYes->setSettings(true, 5, 30, true, true,true); + $dummyTaskFailedNo->setSettings(true, 5, 30, false, false,true); + $dummyTaskPFailedNo->setSettings(true, 5, 30, false, false,true); + + // Save all these tasks + $this->taskStorage->addTask($dummyTaskFailedYes); + $this->taskStorage->addTask($dummyTaskPFailedYes); + $this->taskStorage->addTask($dummyTaskFailedNo); + $this->taskStorage->addTask($dummyTaskPFailedNo); + + // Then cycle the SuperVisor + $this->superVisor->cycle(); + + // Pause 1/10th of a second + usleep(500000); + + // Reload all tasks from TaskStorage + $dummyTaskFailedYes = $this->taskStorage->getTaskById($dummyTaskFailedYes->getId()); + $dummyTaskPFailedYes = $this->taskStorage->getTaskById($dummyTaskPFailedYes->getId()); + $dummyTaskFailedNo = $this->taskStorage->getTaskById($dummyTaskFailedNo->getId()); + $dummyTaskPFailedNo = $this->taskStorage->getTaskById($dummyTaskPFailedNo->getId()); + + // Check if the tasks that should retry are running + $this->assertEquals(Task::RUNNING, $dummyTaskFailedYes->getStatus()); + $this->assertEquals(Task::RUNNING, $dummyTaskPFailedYes->getStatus()); + $this->assertTrue($this->executor->getTaskRunning($dummyTaskFailedYes)); + $this->assertTrue($this->executor->getTaskRunning($dummyTaskPFailedYes)); + + // Check if the tasks that shouldn't have been cancelled + $this->assertEquals(Task::CANCELLED, $dummyTaskFailedNo->getStatus()); + $this->assertEquals(Task::CANCELLED, $dummyTaskPFailedNo->getStatus()); + } + + /** + * @depends testFailedTask + */ + public function testExceedMaxRetries() + { + // First create the dummy tasks + $dummyTask = new Task('testExceedMaxRetries', new ArgumentedHandler(10, 'Some Output'), false); + $dummyTask2 = new Task('testExceedMaxRetries2', new ArgumentedHandler(10, 'Some Output'), false); + + // Set status and retry settings + $dummyTask->setStatus(Task::FAILED); + $dummyTask2->setStatus(Task::FAILED); + $dummyTask->setSettings(true, 2, 30, true, true, true); + $dummyTask2->setSettings(true, 2, 30, true, true, true); + + // Set retries to 2 for the first task, and 1 for the second task + $dummyTask->addRetry(); + $dummyTask->addRetry(); + $dummyTask2->addRetry(); + + // Write the task to TaskStorage + $this->taskStorage->addTask($dummyTask); + $this->taskStorage->addTask($dummyTask2); + + // Cycle the SuperVisor + $this->superVisor->cycle(); + + // Pause 1/10th of a second + usleep(500000); + + // And check if the Task has been cancelled + $dummyTask = $this->taskStorage->getTaskById($dummyTask->getId()); + $dummyTask2 = $this->taskStorage->getTaskById($dummyTask2->getId()); + $this->assertEquals(Task::CANCELLED, $dummyTask->getStatus()); + $this->assertEquals(Task::RUNNING, $dummyTask2->getStatus()); + } + + /** + * @depends testFailedTask + */ + public function testFailedToPost() + { + // First create the dummy tasks + $dummyTask = new Task('testFailedToPost', new ArgumentedHandler(10, 'Some Output'), true); + + // Set status and settings + $dummyTask->setStatus(Task::FAILED); + $dummyTask->addAttribute('pid', 1010); + + // Write the task to TaskStorage + $this->taskStorage->addTask($dummyTask); + + // Cycle the SuperVisor + $this->superVisor->cycle(); + + // Pause 1/10th of a second + usleep(500000); + + // And check if the Task has been moved to Post + $dummyTask = $this->taskStorage->getTaskById($dummyTask->getId()); + $this->assertEquals(Task::POST, $dummyTask->getStatus()); + $this->assertTrue($this->executor->getTaskRunning($dummyTask)); + $this->assertIsInt($dummyTask->attribute('pid')); + $this->assertNotEquals(1010, $dummyTask->attribute('pid')); + } + + /** + * @depends testFinishedTask + */ + public function testSuccessfulTasks() + { + // First create the dummy tasks + $dummyTaskPostNo = new Task('testSuccessfulTasksN', new ArgumentedHandler(10, 'Some Output'), false); + $dummyTaskPostYes = new Task('testSuccessfulTasksY', new ArgumentedHandler(10, 'Some Output'), true); + + // Set status and settings + $dummyTaskPostNo->setStatus(Task::SUCCESS); + $dummyTaskPostYes->setStatus(Task::SUCCESS); + + // Write the tasks to TaskStorage + $this->taskStorage->addTask($dummyTaskPostNo); + $this->taskStorage->addTask($dummyTaskPostYes); + + // Cycle the SuperVisor + $this->superVisor->cycle(); + + // Pause 1/10th of a second + usleep(500000); + + // And check if the Tasks have been completed or moved to post + $dummyTaskPostNo = $this->taskStorage->getTaskById($dummyTaskPostNo->getId()); + $dummyTaskPostYes = $this->taskStorage->getTaskById($dummyTaskPostYes->getId()); + $this->assertEquals(Task::COMPLETED, $dummyTaskPostNo->getStatus()); + $this->assertEquals(Task::POST, $dummyTaskPostYes->getStatus()); + $this->assertTrue($this->executor->getTaskRunning($dummyTaskPostYes)); + } + + public function testPostTasks() + { + // First create the dummy tasks + $dummyTaskFinished = new Task('testPostTasksFinished', new ArgumentedHandler(10, 'Some Output'), true); + $dummyTaskMissing = new Task('testPostTasksMissing', new ArgumentedHandler(10, 'Some Output'), true); + + // Set status and settings + $dummyTaskFinished->setStatus(Task::POST); + $dummyTaskMissing->setStatus(Task::POST); + $dummyTaskFinished->addAttribute('pid', 1011); + $dummyTaskMissing->addAttribute('pid', 1012); + + // Write the tasks to TaskStorage + $this->taskStorage->addTask($dummyTaskFinished); + $this->taskStorage->addTask($dummyTaskMissing); + $this->taskStorage->writePostOutput($dummyTaskFinished, 'Post Output', 'Post Errors', Task::SUCCESS); + + // Cycle the SuperVisor + $this->superVisor->cycle(); + + // Pause 1/10th of a second + usleep(500000); + + // And check if the Tasks have been completed or failed + $dummyTaskFinished = $this->taskStorage->getTaskById($dummyTaskFinished->getId()); + $dummyTaskMissing = $this->taskStorage->getTaskById($dummyTaskMissing->getId()); + $this->assertEquals(Task::COMPLETED, $dummyTaskFinished->getStatus()); + $this->assertEquals(Task::CANCELLED, $dummyTaskMissing->getStatus()); + $this->assertEquals('Post Output', $dummyTaskFinished->getPostOutput()); + $this->assertNull($dummyTaskMissing->getPostOutput()); + } +} diff --git a/test/temp/placeholder b/test/temp/placeholder new file mode 100644 index 0000000..e69de29 diff --git a/storage.php b/test/temp/storage.php similarity index 100% rename from storage.php rename to test/temp/storage.php