diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..952ad32 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +composer.phar +composer.lock +.idea/ +log/ +vendor/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..5a4a7d0 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,7 @@ +FROM php:7.3-cli-buster + +RUN apt-get update &&\ + apt-get install --no-install-recommends --assume-yes --quiet procps ca-certificates curl git &&\ + rm -rf /var/lib/apt/lists/* +# PDO +RUN docker-php-ext-install pdo_mysql \ No newline at end of file diff --git a/Tasks/src/Supervisors/ParallelSuperVisor.php b/Tasks/src/Supervisors/ParallelSuperVisor.php deleted file mode 100644 index 22c5436..0000000 --- a/Tasks/src/Supervisors/ParallelSuperVisor.php +++ /dev/null @@ -1,211 +0,0 @@ -taskStorage = $taskStorage; - $this->executor = $executor; - } - - /** - * @inheritDoc - */ - public function cycle(): int - { - // First: if there are no tasks, load them - $this->taskStorage->refreshTasks(); - $this->tasks = $this->taskStorage->readTasks(); - - // If there are still no tasks, nothing is queued, so this cycle can end. - if (empty($this->tasks)) - return SuperVisor::FINISHED; - - // Check if all tasks are completed - $allCompleted = true; - foreach ($this->tasks as $task) - if ($task->getStatus() !== Task::COMPLETED) - $allCompleted = false; - - if ($allCompleted) - return SuperVisor::FINISHED; - - for ($i=0;$itasks);$i++) - { - $task = $this->tasks[$i]; - dump(date('H:i:s') . ': ' . $task->getId() .'/'.$i . ": state #" . $task->getStatus()); - - // DELAYED Tasks. If a task is delayed, but the time has passed, mark the task as pending - if ($task->getStatus() === Task::DELAYED && time() < $task->getDelayTime()) - $task->setStatus(Task::PENDING); - - // FAILED Tasks, check if failed can be made pending using constraints - elseif ($task->getStatus() === Task::FAILED) - $task = $this->testConstraints($task); - - // FINISHED/CANCELLED/FAILED Tasks, check if they are old enough for removal - elseif ($task->getStatus() === Task::COMPLETED || $task->getStatus() === Task::CANCELLED || $task->getStatus() === Task::FAILED) - { - } - - // If the task is pending, start if not constrained - elseif ($task->getStatus() === Task::PENDING) - { - // First test for constraints. - $task = $this->testConstraints($task); - - // If the task is still PENDING, execute it - if ($task->getStatus() !== Task::PENDING) - continue; - - // START THE PROCESS - $task = $this->executor->startTask($task); - $this->taskStorage->modifyTask($task); - } - - // Check if tasks are still running - elseif ($task->getStatus() === Task::RUNNING) - { - $isRunning = $this->executor->getTaskRunning($task); - $output = $this->taskStorage->readTaskOutput($task); - $hasOutput = !is_null($output); - - if (!$isRunning && !$hasOutput) - $task->setStatus(Task::FAILED); - elseif (!$isRunning && $hasOutput) - { - try { - $task->setOutput($output['output'], $output['errors']); - $task->setStatus($output['statusCode']); - } catch (TasksException $e) { - // Ignore - } - } - // elseif (EXPIRED) - else - continue; - - $this->taskStorage->modifyTask($task); - } - - // Check if task succeeded, and whether the task must progress to POST or COMPLETED - // @todo Add failed handler - elseif ($task->getStatus() === Task::SUCCESS) - { - // Calling post handler when requested to do so - if ($task->getCallPostHandler() === true || ($task->getCallPostHandlerWhenFailed() === true && $output['statusCode'] === Task::FAILED)) - $task = $this->executor->startTask($task, true); - else - $task->setStatus(Task::COMPLETED); - - $this->taskStorage->modifyTask($task); - } - - // If the task is in post mode, check when that process has ended - elseif ($task->getStatus() === Task::POST) - { - $isRunning = $this->executor->getTaskRunning($task); - $output = $this->taskStorage->readTaskOutput($task); - $hasOutput = isset($output['postStatus']); - - if (!$isRunning && !$hasOutput) - $task->setStatus(Task::FAILED); - elseif (!$isRunning && $hasOutput) - { - try { - $task->setPostOutput($output['postOutput'], $output['postErrors']); - $task->setStatus(Task::COMPLETED); - } catch (TasksException $e) { - // Ignore - } - } - else - continue; - - $this->taskStorage->modifyTask($task); - } - - - - } - - return SuperVisor::RUNNING; - } - - private function testConstraints(Task $task): Task - { - $constraints = $task->getConstraints(); - foreach ($constraints as $constraint) - { - if ($constraint->intervene($task) && $constraint->blockCode() != 0) - { - $task->setStatus($constraint->blockCode()); - if ($constraint->blockCode() === Task::DELAYED) - $task->setDelayTime(time() + $constraint->delayTime()); - - // Save changes to TaskStorage - $this->taskStorage->modifyTask($task); - } - } - - return $task; - } - - -} \ No newline at end of file diff --git a/Tasks/supervisor.php b/Tasks/supervisor.php deleted file mode 100644 index 1401a0d..0000000 --- a/Tasks/supervisor.php +++ /dev/null @@ -1,84 +0,0 @@ -setTimeZone('Europe/Amsterdam'); -$configurator->setTempDirectory(dirname(__DIR__) . DIRECTORY_SEPARATOR . 'temp'); -$configurator->setLogDirectory(dirname(__DIR__). DIRECTORY_SEPARATOR . 'log'); - -// Enable components -// WebComponent -$webAppComponent = new WebAppComponent(); -$configurator->addComponent($webAppComponent); - -// Add directories -$configurator->addDirectory(dirname(__FILE__) . DIRECTORY_SEPARATOR . 'Config', 'config', Priority::HIGH); -$configurator->addDirectory(dirname(__FILE__) . DIRECTORY_SEPARATOR . 'Helper', 'helpers', Priority::HIGH); -$configurator->addDirectory(dirname(__FILE__) . DIRECTORY_SEPARATOR . 'Library', 'libraries', Priority::HIGH); -$configurator->addDirectory(dirname(__FILE__) . DIRECTORY_SEPARATOR . 'Plugin', 'plugins', Priority::HIGH); -$configurator->addDirectory(dirname(__FILE__) . DIRECTORY_SEPARATOR . 'Controller', 'controllers', Priority::HIGH); -$configurator->addDirectory(dirname(__FILE__) . DIRECTORY_SEPARATOR . 'View', 'views', Priority::HIGH); -$configurator->addDirectory(dirname(__FILE__) . DIRECTORY_SEPARATOR . 'Model', 'models', Priority::HIGH); -$configurator->addDirectory(dirname(__FILE__) . DIRECTORY_SEPARATOR . 'Layout', 'layouts', Priority::NORMAL); - -// Debug -$configurator->enableDebugMode()->setDebugAddress('ALL'); -$container = $configurator->createContainer(); - -Logger::enableScreenLog(); - -// RUN THE APP -/** @var Tasks $lib */ -$lib = $container->libraries->get('tasks'); - -$supervisor = $lib->getSuperVisor(); -while ($supervisor->cycle() === SuperVisor::RUNNING) { - usleep(250000); -} \ No newline at end of file diff --git a/bin/supervisor b/bin/supervisor new file mode 100644 index 0000000..28fda4b --- /dev/null +++ b/bin/supervisor @@ -0,0 +1,99 @@ +#!/usr/bin/env php +')) { + fwrite( + STDERR, + sprintf( + 'FuzeWorks Async requires PHP 7.1 or higher.' . PHP_EOL . + 'You are using PHP %s (%s).' . PHP_EOL, + PHP_VERSION, + PHP_BINARY + ) + ); + + die(1); +} + +// First load composer +$autoloaders = [ + __DIR__ . '/../../autoload.php', + __DIR__ . '/../vendor/autoload.php', + __DIR__ . '/vendor/autoload.php' +]; +foreach ($autoloaders as $file) + if (file_exists($file)) + require($file); + + +try { + // Open configurator + $configurator = new FuzeWorks\Configurator(); + + // Set up basic settings + $configurator->setTimeZone('Europe/Amsterdam'); + $configurator->setTempDirectory(dirname(__FILE__) . DIRECTORY_SEPARATOR . 'temp'); + $configurator->setLogDirectory(dirname(__FILE__). DIRECTORY_SEPARATOR . 'log'); + + // Add Async library + $configurator->deferComponentClassMethod('libraries', 'addLibraryClass', null, 'async', '\FuzeWorks\Async\Tasks'); + + // Debug + $configurator->enableDebugMode()->setDebugAddress('ALL'); + + // Create container + $container = $configurator->createContainer(); + + // RUN THE APP + /** @var Tasks $lib */ + $lib = $container->libraries->get('async'); + + $supervisor = $lib->getSuperVisor(); + while ($supervisor->cycle() === SuperVisor::RUNNING) { + usleep(250000); + } +} catch (InvalidArgumentException | TasksException | LibraryException $e) { + fwrite(STDERR, sprintf('FuzeWorks Async could not load.' . PHP_EOL . + 'Exception: ' . $e->getMessage() . PHP_EOL) + ); +} \ No newline at end of file diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..2a3620e --- /dev/null +++ b/composer.json @@ -0,0 +1,26 @@ +{ + "name": "fuzeworks/async", + "description": "FuzeWorks Async task execution library", + "license": ["MIT"], + "authors": [ + { + "name": "TechFuze", + "homepage": "https://techfuze.net" + }, + { + "name": "FuzeWorks Community", + "homepage": "https://techfuze.net/fuzeworks/contributors" + } + ], + "require": { + "php": ">=7.2.0", + "fuzeworks/core": "~1.2.0", + "ext-json": "*" + }, + "autoload": { + "psr-4": { + "FuzeWorks\\Async\\": "src/FuzeWorks/Async" + } + }, + "bin": ["bin/supervisor"] +} \ No newline at end of file diff --git a/Tasks/config.tasks.php b/config.tasks.php similarity index 87% rename from Tasks/config.tasks.php rename to config.tasks.php index 73b5b6b..21f14d5 100644 --- a/Tasks/config.tasks.php +++ b/config.tasks.php @@ -1,10 +1,10 @@ [ @@ -48,6 +48,6 @@ return array( 'type' => 'ShellExecutor', // For ShellExecutor, first parameter is the file location of the worker script - 'parameters' => [dirname(dirname(__DIR__)) . DS . 'worker.php'] + 'parameters' => [dirname(__FILE__) . DS . 'worker.php'] ] ); \ No newline at end of file diff --git a/Tasks/src/Constraint.php b/src/FuzeWorks/Async/Constraint.php similarity index 87% rename from Tasks/src/Constraint.php rename to src/FuzeWorks/Async/Constraint.php index c1ec515..9b3f100 100644 --- a/Tasks/src/Constraint.php +++ b/src/FuzeWorks/Async/Constraint.php @@ -1,10 +1,10 @@ dependencies = $dependencyList; + $this->delayTimes = $delayTimes; + + // Test if async library can be loaded + $this->loadTasksLib(); + } + + /** + * @inheritDoc + */ + public function intervene(Task $task): bool + { + // Fetch taskStorage + try { + $tasks = $this->loadTasksLib(); + $taskStorage = $tasks->getTaskStorage(); + + // Is any dependency unresolved? + $hasUnresolved = false; + + // Test if any dependency has not been resolved + foreach ($this->dependencies as $dependency) + { + // Get dependency + $dependencyTask = $taskStorage->getTaskById($dependency); + + + // If the dependency task is completed, ignore it and continue to next dependency + if ($dependencyTask->getStatus() === Task::COMPLETED) + continue; + elseif ($dependencyTask->getStatus() === Task::CANCELLED) + { + // Cancel current task + $task->setOutput('', 'Task cancelled due to failed dependency.'); + $this->returnStatus = Task::CANCELLED; + return true; + } + else + $hasUnresolved = true; + } + + // If any unresolved tasks exist, delay task execution + if ($hasUnresolved) + { + $this->returnStatus = Task::DELAYED; + return true; + } + } catch (TasksException $e) { + Logger::logError("Could not constraint task. '" . $e->getMessage() . "'"); + $this->returnStatus = Task::FAILED; + return true; + } + + return false; + } + + /** + * @inheritDoc + */ + public function blockCode(): int + { + return $this->returnStatus; + } + + /** + * @inheritDoc + */ + public function delayTime(): int + { + return time() + $this->delayTimes; + } + + /** + * Load the tasks library, so that dependencies can get scanned later + * + * @return Tasks + * @throws TasksException + */ + private function loadTasksLib(): Tasks + { + try { + /** @var Libraries $libraries */ + $libraries = Factory::getInstance('libraries'); + + /** @var Tasks $tasks */ + $tasks = $libraries->get('async'); + + return $tasks; + } catch (FactoryException | LibraryException $e) { + throw new TasksException("Could not constrain task. Async library could not be loaded."); + } + } +} \ No newline at end of file diff --git a/src/FuzeWorks/Async/Constraint/FixedTimeConstraint.php b/src/FuzeWorks/Async/Constraint/FixedTimeConstraint.php new file mode 100644 index 0000000..eb17a0c --- /dev/null +++ b/src/FuzeWorks/Async/Constraint/FixedTimeConstraint.php @@ -0,0 +1,96 @@ +timestamp = $unitTimeStamp; + } + + /** + * @inheritDoc + */ + public function intervene(Task $task): bool + { + if ($task->getStatus() === Task::PENDING && time() < $this->timestamp) + return true; + + return false; + } + + /** + * @inheritDoc + */ + public function blockCode(): int + { + return Task::DELAYED; + } + + /** + * @inheritDoc + */ + public function delayTime(): int + { + return $this->timestamp; + } +} \ No newline at end of file diff --git a/Tasks/src/Events/TaskHandleEvent.php b/src/FuzeWorks/Async/Events/TaskHandleEvent.php similarity index 86% rename from Tasks/src/Events/TaskHandleEvent.php rename to src/FuzeWorks/Async/Events/TaskHandleEvent.php index 27e52f2..296f9ef 100644 --- a/Tasks/src/Events/TaskHandleEvent.php +++ b/src/FuzeWorks/Async/Events/TaskHandleEvent.php @@ -1,10 +1,10 @@ worker = $workerFile; } - private function shellExec($format, array $parameters = []) { $parameters = array_map("escapeshellarg", $parameters); @@ -81,18 +81,8 @@ class ShellExecutor implements Executor // Then execute the command using the base64_encoded string of the taskID $output = $this->shellExec($commandString, [base64_encode($task->getId())]); - - // Add the PID to the task and change its state to 'RUNNING' - if ($post) - { - $task->setStatus(Task::POST); - $task->addAttribute('post_pid', $output[0]); - } - else - { - $task->setStatus(Task::RUNNING); - $task->addAttribute('run_pid', $output[0]); - } + $pid = intval($output[0]); + $task->setProcess(new Process($pid)); // And finally return the task return $task; @@ -114,28 +104,38 @@ class ShellExecutor implements Executor // First prepare the command used to gather info on processes $commandString = "ps -o pid,%%cpu,%%mem,state,start -p %s | sed 1d"; - // Then fetch the PID - try { - $pid = is_null($task->attribute('post_pid')) ? $task->attribute('run_pid') : $task->attribute('post_pid'); - $output = $this->shellExec($commandString, [$pid]); - - if (count($output) < 1) - return null; - - $last = $output[count($output) - 1]; - if (trim($last) === "") - return null; - - $parts = preg_split("/\s+/", trim($last)); - $pid = intval($parts[0]); - $state = strtoupper(trim($parts[3])); - if ("{$pid}" !== $parts[0] || $state === 'Z') - return null; - - return $parts; - } catch (TasksException $e) { + // First we must determine what process is used. + $process = $task->getProcess(); + if (is_null($process)) return null; - } + + // Then using that process we determine the ProcessID + $pid = $process->getPid(); + + // And we execute the commandString to fetch info on the process + $output = $this->shellExec($commandString, [$pid]); + + // If not output is provided, the command failed and should return null + if (count($output) < 1) + return null; + + // ?? + $last = $output[count($output) - 1]; + if (trim($last) === "") + return null; + + // Split up the info + $parts = preg_split("/\s+/", trim($last)); + + // Determine the state of the process + // If the process is in a 'zombie' state, it should be considered fully executed. + // Cleanup of Zombie processes must take place by periodically restarting the SuperVisor, or by using a SuperVisor which does not have the zombie problem + $state = strtoupper(trim($parts[3])); + if ("{$pid}" !== $parts[0] || $state === 'Z') + return null; + + // Finally, return the Task information + return $parts; } public function getTaskExitCode(Task $task): int diff --git a/Tasks/src/Handler.php b/src/FuzeWorks/Async/Handler.php similarity index 88% rename from Tasks/src/Handler.php rename to src/FuzeWorks/Async/Handler.php index 72e4a7d..fe7d4ec 100644 --- a/Tasks/src/Handler.php +++ b/src/FuzeWorks/Async/Handler.php @@ -1,10 +1,10 @@ pid = $pid; + } + + /** + * Receive the process Id of this process + * + * @return int + */ + public function getPid(): int + { + return $this->pid; + } + +} \ No newline at end of file diff --git a/Tasks/src/SuperVisor.php b/src/FuzeWorks/Async/SuperVisor.php similarity index 90% rename from Tasks/src/SuperVisor.php rename to src/FuzeWorks/Async/SuperVisor.php index 2508fac..e894b88 100644 --- a/Tasks/src/SuperVisor.php +++ b/src/FuzeWorks/Async/SuperVisor.php @@ -1,10 +1,10 @@ taskStorage = $taskStorage; + $this->executor = $executor; + } + + /** + * @inheritDoc + */ + public function cycle(): int + { + // First: if there are no tasks, load them + $this->taskStorage->refreshTasks(); + $this->tasks = $this->taskStorage->readTasks(); + + // If there are still no tasks, nothing is queued, so this cycle can end. + if (empty($this->tasks)) + return SuperVisor::FINISHED; + + for ($i=0;$itasks);$i++) + { + $task = $this->tasks[$i]; + + // PENDING: should start if not constrained + if ($task->getStatus() === Task::PENDING) + { + // Test if constrained + $task = $this->testConstraints($task); + + // If the task changed status, task is no longer pending and should be processed by another statement + if ($task->getStatus() !== Task::PENDING) + continue; + + // Start the process using the executor service + $task = $this->executor->startTask($task); + $task->setStatus(Task::RUNNING); + + // Modify the task in TaskStorage + $this->taskStorage->modifyTask($task); + } + + // DELAYED: If task is delayed, and enough time has passed, change the status back to pending + elseif ($task->getStatus() === Task::DELAYED && time() > $task->getDelayTime()) + { + $task->setStatus(Task::PENDING); + $this->taskStorage->modifyTask($task); + } + + // CANCELLED/COMPLETED: remove the task if requested to do so + elseif ($task->getStatus() === Task::COMPLETED || $task->getStatus() === Task::CANCELLED) + { + } + + // RUNNING: check if task is still running. If not, set result based on output + elseif ($task->getStatus() === Task::RUNNING) + { + $isRunning = $this->executor->getTaskRunning($task); + $output = $this->taskStorage->readTaskOutput($task, $task->getRetries()); + $hasOutput = !is_null($output); + + // If nothing is found, the process has crashed and status PFAILED should be set + if (!$isRunning && !$hasOutput) + $task->setStatus(Task::PFAILED); + // @todo Set PFAILED after $max_Time + // If output is found, use the status code from that + elseif (!$isRunning && $hasOutput) + { + try { + $task->setOutput($output['output'], $output['errors']); + $task->setStatus($output['statusCode']); + } catch (TasksException $e) { + // On failure to set output, consider as a process failure + $task->setStatus(Task::PFAILED); + } + } + // In any other situation the process is still running and should be left alone + else + continue; + + // If any changes have been made, they should be written to TaskStorage + $this->taskStorage->modifyTask($task); + } + + // FAILED: if a process has failed, attempt to rety if requested to do so + elseif ($task->getStatus() === Task::PFAILED || $task->getStatus() === Task::FAILED) + { + // First fetch retry conditions + $settings = $task->getRetrySettings(); + + // First test if any retries should be tried at all + if ($settings['retryOnFail'] === true && $task->getRetries() < $settings['maxRetries']) + { + // Then test if this type of failure should be retried and whether the mexRetries has been exceeded + if ( + ($task->getStatus() === Task::PFAILED && $settings['retryPFailures'] === true) || + ($task->getStatus() === Task::FAILED && $settings['retryRFailures'] === true) + ) + { + // If eligible, reset task to pending + $task->addRetry(); + $task = $this->executor->startTask($task); + $task->setStatus(Task::RUNNING); + $this->taskStorage->modifyTask($task); + continue; + } + } + + // If the task is not eligible for a retry, either cancel it or move it to a postHandler + if ($task->getUsePostHandler() === true) + { + $task->resetRetries(); + $task = $this->executor->startTask($task, true); + $task->setStatus(Task::POST); + } + else + $task->setStatus(Task::CANCELLED); + + $this->taskStorage->modifyTask($task); + } + + // SUCCESS: if a task has succeeded, see if it needs a postHandler + elseif ($task->getStatus() === Task::SUCCESS) + { + if ($task->getUsePostHandler() === true) + { + $task->resetRetries(); + $task = $this->executor->startTask($task, true); + $task->setStatus(Task::POST); + } + else + $task->setStatus(Task::COMPLETED); + + $this->taskStorage->modifyTask($task); + } + + // POST: when a task is currently running in it's postHandler + elseif ($task->getStatus() === Task::POST) + { + $isRunning = $this->executor->getTaskRunning($task); + $output = $this->taskStorage->readPostOutput($task, $task->getRetries()); + $hasOutput = !is_null($output); + + // If a task is not running and has no output, an error has occurred + if (!$isRunning && !$hasOutput) + { + // Test if a retry should be attempted + $settings = $task->getRetrySettings(); + if ($settings['retryOnFail'] === true && $settings['retryPostFailures'] === true && $settings['maxRetries'] > $task->getRetries()) + { + $task->addRetry(); + $task = $this->executor->startTask($task, true); + } + elseif ($settings['maxRetries'] <= $task->getRetries()) + $task->setStatus(Task::CANCELLED); + } + // @todo Retry after $max_Time + // If a task is not running and has output, set that output and mark as completed + elseif (!$isRunning && $hasOutput) + { + $task->setPostOutput($output['output'], $output['errors']); + $task->setStatus(Task::COMPLETED); + } + // If the task is still running, leave it be + else + continue; + + // If any changes have been made, they should be written to TaskStorage + $this->taskStorage->modifyTask($task); + } + + } + + // Check if all tasks are completed + $allCompleted = true; + $anyDelayed = false; + foreach ($this->tasks as $task) + { + if ($task->getStatus() !== Task::COMPLETED && $task->getStatus() !== Task::CANCELLED) + $allCompleted = false; + elseif ($task->getStatus() === Task::DELAYED) + $anyDelayed = true; + } + + // If all are finished and none are delayed + if ($allCompleted && !$anyDelayed) + return SuperVisor::FINISHED; + if ($allCompleted && $anyDelayed) + return SuperVisor::CONSTRAINED; + else + return SuperVisor::RUNNING; + } + + private function testConstraints(Task $task): Task + { + $constraints = $task->getConstraints(); + foreach ($constraints as $constraint) + { + if ($constraint->intervene($task) && $constraint->blockCode() != 0) + { + $task->setStatus($constraint->blockCode()); + if ($constraint->blockCode() === Task::DELAYED) + $task->setDelayTime($constraint->delayTime()); + + // Save changes to TaskStorage + $this->taskStorage->modifyTask($task); + } + } + + return $task; + } + + +} \ No newline at end of file diff --git a/Tasks/src/Task.php b/src/FuzeWorks/Async/Task.php similarity index 56% rename from Tasks/src/Task.php rename to src/FuzeWorks/Async/Task.php index ccd95b2..3367929 100644 --- a/Tasks/src/Task.php +++ b/src/FuzeWorks/Async/Task.php @@ -1,10 +1,10 @@ taskId = $identifier; $this->handlerClass = $handlerClass; + $this->usePostHandler = $usePostHandler; if (func_num_args() > 3) $args = array_slice(func_get_args(), 3); else @@ -162,6 +232,16 @@ class Task return $this->handlerClass; } + /** + * Whether the postHandler on the handlerClass should be invoked after processing the initial task. + * + * @return bool + */ + public function getUsePostHandler(): bool + { + return $this->usePostHandler; + } + /** * Gets the arguments to be provided to the method of the class that shall process this task * @@ -205,6 +285,8 @@ class Task /** * Sets the status of this Task. * + * Must be one of the constants of this Task class + * * @param int $status */ public function setStatus(int $status) @@ -240,10 +322,10 @@ class Task */ public function attribute(string $key) { - if (!isset($this->data[$key])) + if (!isset($this->attributes[$key])) return null; - return $this->data[$key]; + return $this->attributes[$key]; } /** @@ -258,7 +340,7 @@ class Task if (!$this->isSerializable($value)) throw new TasksException("Could not set Task '$this->taskId' attribute '$key'. Value not serializable."); - $this->data[$key] = $value; + $this->attributes[$key] = $value; } /** @@ -276,31 +358,6 @@ class Task return $this->postOutput; } - - /** - * Sets the conditions for when the Handler::postHandler should be called. - * - * If you want the postHandler to only get called on errors, set onEvery to false and onFail to true. - * - * @param bool $onEvery Call the post handler on every result - * @param bool $onFail Call the post handler only when failed - */ - public function setPostHandler(bool $onEvery = false, bool $onFail = false) - { - $this->callPostHandler = $onEvery; - $this->callPostHandlerWhenFailed = $onFail; - } - - public function getCallPostHandler(): bool - { - return $this->callPostHandler; - } - - public function getCallPostHandlerWhenFailed(): bool - { - return $this->callPostHandlerWhenFailed; - } - /** * Return the errors of this task execution * @@ -317,24 +374,121 @@ class Task } + /** + * @todo Handle output from multiple attempts + * @param string $output + * @param string $errors + */ public function setOutput(string $output, string $errors) { - if (!is_null($this->output) || !is_null($this->errors)) - throw new TasksException("Could not set output. Output already set."); - $this->output = $output; $this->errors = $errors; } + /** + * @todo Handle output from multiple attempts + * @param string $output + * @param string $errors + */ public function setPostOutput(string $output, string $errors) { - if (!is_null($this->postOutput) || !is_null($this->postErrors)) - throw new TasksException("Could not set post output. Output already set."); - $this->postOutput = $output; $this->postErrors = $errors; } + /** + * Sets the initial process for this task. + * + * To be set by Executor + * + * @param Process $process + */ + public function setProcess(Process $process) + { + $this->process = $process; + } + + /** + * Returns the initial process for this task + * + * @return Process|null + */ + public function getProcess(): ?Process + { + return $this->process; + } + + public function removeProcess(): bool + { + if ($this->process instanceof Process) + { + $this->process = null; + return true; + } + + return false; + } + + /** + * Set whether this task should retry after a failure, and how many times + * + * @param bool $retryOnFail + * @param int $maxRetries + * @param bool $retryRegularFailures + * @param bool $retryProcessFailures + * @param bool $retryPostFailures + */ + public function setRetrySettings(bool $retryOnFail, int $maxRetries = 2, bool $retryRegularFailures = true, bool $retryProcessFailures = true, bool $retryPostFailures = true) + { + $this->retryOnFail = $retryOnFail; + $this->maxRetries = $maxRetries; + $this->retryPFailures = $retryProcessFailures; + $this->retryRFailures = $retryRegularFailures; + $this->retryPostFailures = $retryPostFailures; + } + + /** + * Returns the failure retry settings + * + * @return array + */ + public function getRetrySettings(): array + { + return [ + 'retryOnFail' => $this->retryOnFail, + 'maxRetries' => $this->maxRetries, + 'retryPFailures' => $this->retryPFailures, + 'retryRFailures' => $this->retryRFailures, + 'retryPostFailures' => $this->retryPostFailures + ]; + } + + /** + * Add a retry to the retry counter + */ + public function addRetry() + { + $this->retries++; + } + + /** + * Reset the retry counter back to 0 + */ + public function resetRetries() + { + $this->retries = 0; + } + + /** + * Receive the amount of retries already attempted + * + * @return int + */ + public function getRetries(): int + { + return $this->retries; + } + /** * Checks whether an object can be serialized * diff --git a/Tasks/src/TaskFailException.php b/src/FuzeWorks/Async/TaskFailException.php similarity index 85% rename from Tasks/src/TaskFailException.php rename to src/FuzeWorks/Async/TaskFailException.php index 6e72792..52cb7d4 100644 --- a/Tasks/src/TaskFailException.php +++ b/src/FuzeWorks/Async/TaskFailException.php @@ -1,10 +1,10 @@ getRetries(). If 0, it is the initial attempt. If > 0, it seeks a retry output. * * @param Task $task * @param string $output * @param string $errors * @param int $statusCode + * @param int $attempt * @return bool + * @throws TasksException */ - public function writePostOutput(Task $task, string $output, string $errors, int $statusCode): bool; + public function writeTaskOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool; + + /** + * Write the output of the postHandler into TaskStorage + * + * $attempt refers to $task->getRetries(). If 0, it is the initial attempt. If > 0, it seeks a retry output. + * + * @param Task $task + * @param string $output + * @param string $errors + * @param int $statusCode + * @param int $attempt + * @return bool + * @throws TasksException + */ + public function writePostOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool; /** * Read the task output from taskStorage. * * The following output is expected: * array('output' => string $output, 'errors' => string $errors, 'status' => $code) - * OR null of not found (yet) + * OR null if not found (yet) + * + * $attempt refers to $task->getRetries(). If 0, it is the initial attempt. If > 0, it seeks a retry output. * * @param Task $task + * @param int $attempt * @return array */ - public function readTaskOutput(Task $task): ?array; + public function readTaskOutput(Task $task, int $attempt = 0): ?array; + + /** + * Read the output from the postHandler + * + * The following output is expected: + * array('output' => string $output, 'errors' => string $errors, 'status' => $code) + * OR null if not found (yet) + * + * $attempt refers to $task->getRetries(). If 0, it is the initial attempt. If > 0, it seeks a retry output. + * + * @param Task $task + * @param int $attempt + * @return array|null + */ + public function readPostOutput(Task $task, int $attempt = 0): ?array; } \ No newline at end of file diff --git a/Tasks/src/TaskStorage/ArrayTaskStorage.php b/src/FuzeWorks/Async/TaskStorage/ArrayTaskStorage.php similarity index 83% rename from Tasks/src/TaskStorage/ArrayTaskStorage.php rename to src/FuzeWorks/Async/TaskStorage/ArrayTaskStorage.php index 99f717a..cd3197b 100644 --- a/Tasks/src/TaskStorage/ArrayTaskStorage.php +++ b/src/FuzeWorks/Async/TaskStorage/ArrayTaskStorage.php @@ -1,10 +1,10 @@ fileName); - $file = $dir . DS . 'task_' . md5($task->getId()) . '_output.json'; + $file = $dir . DS . 'task_' . md5($task->getId()) . '_' . $attempt . '_output.json'; $contents = json_encode(['taskId' => $task->getId(), 'output' => $output, 'errors' => $errors, 'statusCode' => $statusCode]); // If file already exists, panic @@ -205,11 +205,11 @@ class ArrayTaskStorage implements TaskStorage * @inheritDoc * @throws TasksException */ - public function writePostOutput(Task $task, string $output, string $errors, int $statusCode): bool + public function writePostOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool { // Get the directory of the main storage $dir = dirname($this->fileName); - $file = $dir . DS . 'task_' . md5($task->getId()) . '_post_output.json'; + $file = $dir . DS . 'task_' . md5($task->getId()) . '_' . $attempt . '_post_output.json'; $contents = json_encode(['taskId' => $task->getId(), 'output' => $output, 'errors' => $errors, 'statusCode' => $statusCode]); // If file already exists, panic @@ -225,12 +225,12 @@ class ArrayTaskStorage implements TaskStorage /** * @inheritDoc */ - public function readTaskOutput(Task $task): ?array + public function readTaskOutput(Task $task, int $attempt = 0): ?array { // Get the directory of the main storage $dir = dirname($this->fileName); - $file = $dir . DS . 'task_' . md5($task->getId()) . '_output.json'; - $file2 = $dir . DS . 'task_' . md5($task->getId()) . '_post_output.json'; + $file = $dir . DS . 'task_' . md5($task->getId()) . '_' . $attempt . '_output.json'; + // If file doesn't exist, return so if (!file_exists($file)) @@ -239,20 +239,27 @@ class ArrayTaskStorage implements TaskStorage // Decode the contents $contents = file_get_contents($file); $data = json_decode($contents, true); - $output = ['output' => $data['output'], 'errors' => $data['errors'], 'statusCode' => $data['statusCode']]; - // Add post data if necessary - if (file_exists($file2)) - { - $postContents = file_get_contents($file2); - $postData = json_decode($postContents, true); - $output['postOutput'] = $postData['output']; - $output['postErrors'] = $postData['errors']; - $output['postStatus'] = $postData['statusCode']; - } + return ['output' => $data['output'], 'errors' => $data['errors'], 'statusCode' => $data['statusCode']]; + } - // And return them - return $output; + /** + * @inheritDoc + */ + public function readPostOutput(Task $task, int $attempt = 0): ?array + { + // Get the directory of the main storage + $dir = dirname($this->fileName); + $file = $dir . DS . 'task_' . md5($task->getId()) . '_' . $attempt . '_post_output.json'; + + // If file doesn't exist, return so + if (!file_exists($file)) + return null; + + // Decode the contents + $contents = file_get_contents($file); + $data = json_decode($contents, true); + return ['output' => $data['output'], 'errors' => $data['errors'], 'statusCode' => $data['statusCode']]; } private function commit() diff --git a/Tasks/Tasks.php b/src/FuzeWorks/Async/Tasks.php similarity index 84% rename from Tasks/Tasks.php rename to src/FuzeWorks/Async/Tasks.php index fe858eb..1a8bbab 100644 --- a/Tasks/Tasks.php +++ b/src/FuzeWorks/Async/Tasks.php @@ -1,10 +1,10 @@ addComponentPath(dirname(__FILE__), Priority::LOW); + $config->addComponentPath(dirname(__FILE__, 4), Priority::LOW); $this->cfg = $config->getConfig('tasks'); } @@ -87,7 +80,7 @@ class Tasks implements iLibrary public function getSuperVisor(): SuperVisor { $cfg = $this->cfg->get('SuperVisor'); - $class = 'Application\Library\Tasks\Supervisors\\' . $cfg['type']; + $class = 'FuzeWorks\Async\Supervisors\\' . $cfg['type']; $parameters = isset($cfg['parameters']) && is_array($cfg['parameters']) ? $cfg['parameters'] : []; array_unshift($parameters, $this->getTaskStorage(), $this->getExecutor()); if (!class_exists($class, true)) @@ -118,7 +111,7 @@ class Tasks implements iLibrary public function getTaskStorage(): TaskStorage { $cfg = $this->cfg->get('TaskStorage'); - $class = 'Application\Library\Tasks\TaskStorage\\' . $cfg['type']; + $class = 'FuzeWorks\Async\TaskStorage\\' . $cfg['type']; $parameters = isset($cfg['parameters']) && is_array($cfg['parameters']) ? $cfg['parameters'] : []; if (!class_exists($class, true)) throw new TasksException("Could not get TaskStorage. Type of '$class' not found."); @@ -139,7 +132,7 @@ class Tasks implements iLibrary protected function getExecutor(): Executor { $cfg = $this->cfg->get('Executor'); - $class = 'Application\Library\Tasks\Executors\\' . $cfg['type']; + $class = 'FuzeWorks\Async\Executors\\' . $cfg['type']; $parameters = isset($cfg['parameters']) && is_array($cfg['parameters']) ? $cfg['parameters'] : []; if (!class_exists($class, true)) throw new TasksException("Could not get Executor. Type of '$class' not found."); @@ -156,7 +149,7 @@ class Tasks implements iLibrary */ public function getClassesPrefix(): ?string { - return '\Application\Library\Tasks'; + return null; } /** @@ -164,6 +157,6 @@ class Tasks implements iLibrary */ public function getSourceDirectory(): ?string { - return 'src'; + return null; } } \ No newline at end of file diff --git a/Tasks/src/TasksException.php b/src/FuzeWorks/Async/TasksException.php similarity index 86% rename from Tasks/src/TasksException.php rename to src/FuzeWorks/Async/TasksException.php index 091e1d9..f3dcf56 100644 --- a/Tasks/src/TasksException.php +++ b/src/FuzeWorks/Async/TasksException.php @@ -1,10 +1,10 @@ taskStorage = $taskStorage; @@ -72,45 +82,65 @@ class Worker $event = Events::fireEvent(new TaskHandleEvent(), $task); $task = $event->getTask(); + // Set task to this worker + $this->task = $task; + $this->post = $post; + // Fetch the callable - $class = $task->getHandlerClass(); + $class = $this->task->getHandlerClass(); if (!class_exists($class, true)) + { + $errors = 'Could not run task. HandlerClass \'' . $class . '\' not found.'; + if (!$post) + $this->taskStorage->writeTaskOutput($this->task, '', $errors, Task::PFAILED, $this->task->getRetries()); + else + $this->taskStorage->writePostOutput($this->task, '', $errors, Task::PFAILED, $this->task->getRetries()); + throw new TasksException("Could not run task. '$class' not found."); + } // Create the handler /** @var Handler $object */ $object = new $class(); if (!$object instanceof Handler) + { + $errors = "Could not run task. '$class' is not instance of Handler."; + if (!$post) + $this->taskStorage->writeTaskOutput($this->task, '', $errors, Task::PFAILED, $this->task->getRetries()); + else + $this->taskStorage->writePostOutput($this->task, '', $errors, Task::PFAILED, $this->task->getRetries()); + throw new TasksException("Could not run task. '$class' is not instance of Handler."); + } // Run postHandler if post mode is requested if ($post) { - $postSuccess = $object->postHandler($task); + $postSuccess = $object->postHandler($this->task); $postOutput = $object->getPostOutput(); $postOutput = is_null($postOutput) ? '' : (string) $postOutput; $postErrors = $this->getErrors(); if (!$postSuccess) - $this->taskStorage->writePostOutput($task, $postOutput, $postErrors, Task::FAILED); + $this->taskStorage->writePostOutput($this->task, $postOutput, $postErrors, Task::FAILED, $this->task->getRetries()); else - $this->taskStorage->writePostOutput($task, $postOutput, $postErrors, Task::SUCCESS); + $this->taskStorage->writePostOutput($this->task, $postOutput, $postErrors, Task::SUCCESS, $this->task->getRetries()); $this->output($postOutput, $postErrors); return; } - // And execute - $success = $object->primaryHandler($task); + // Run primaryHandler if requested + $success = $object->primaryHandler($this->task); $output = $object->getOutput(); $output = is_null($output) ? '' : (string) $output; $errors = $this->getErrors(); // And afterwards write the results to the TaskStorage if (!$success) - $this->taskStorage->writeTaskOutput($task, $output, $errors, Task::FAILED); + $this->taskStorage->writeTaskOutput($this->task, $output, $errors, Task::FAILED, $this->task->getRetries()); else - $this->taskStorage->writeTaskOutput($task, $output, $errors, Task::SUCCESS); + $this->taskStorage->writeTaskOutput($this->task, $output, $errors, Task::SUCCESS, $this->task->getRetries()); $this->output($output, $errors); } @@ -128,6 +158,17 @@ class Worker // Collect all error logs $errors = $this->getErrors(); $this->output('', $errors); + + try { + // Write to TaskStorage + if (!$this->post) + $this->taskStorage->writeTaskOutput($this->task, '', $errors, Task::FAILED, $this->task->getRetries()); + else + $this->taskStorage->writePostOutput($this->task, '', $errors, Task::FAILED, $this->task->getRetries()); + } catch (TasksException $e) { + // Ignore + } + exit; } @@ -145,8 +186,7 @@ class Worker continue; $output[] = strtoupper($log['type']) . ' ' . - (!empty($log['logFile']) && !empty($log['logLine']) ? $log['logFile'] . ':' . $log['logLine'] . " '" : '') . - $log['message']; + (!empty($log['logFile']) && !empty($log['logLine']) ? $log['logFile'] . ':' . $log['logLine'] . " '" : "'") . $log['message'] . "'"; } return implode("\n", $output); diff --git a/Tasks/storage.php b/storage.php similarity index 100% rename from Tasks/storage.php rename to storage.php diff --git a/supervisor.php b/supervisor.php new file mode 100644 index 0000000..8fc8427 --- /dev/null +++ b/supervisor.php @@ -0,0 +1,70 @@ +setTimeZone('Europe/Amsterdam'); +$configurator->setTempDirectory(dirname(__FILE__) . DIRECTORY_SEPARATOR . 'temp'); +$configurator->setLogDirectory(dirname(__FILE__). DIRECTORY_SEPARATOR . 'log'); + +// Add Async library +$configurator->deferComponentClassMethod('libraries', 'addLibraryClass', null, 'async', '\FuzeWorks\Async\Tasks'); + +// Debug +$configurator->enableDebugMode()->setDebugAddress('ALL'); + +// Create container +$container = $configurator->createContainer(); + +// Add lib +Logger::enableScreenLog(); + +// RUN THE APP +/** @var Tasks $lib */ +$lib = $container->libraries->get('async'); + +$supervisor = $lib->getSuperVisor(); +while ($supervisor->cycle() === SuperVisor::RUNNING) { + usleep(250000); +} \ No newline at end of file diff --git a/Tasks/worker.php b/worker.php similarity index 52% rename from Tasks/worker.php rename to worker.php index d9cdf87..3439954 100644 --- a/Tasks/worker.php +++ b/worker.php @@ -1,10 +1,10 @@ setTimeZone('Europe/Amsterdam'); -$configurator->setTempDirectory(dirname(__DIR__) . DIRECTORY_SEPARATOR . 'temp'); -$configurator->setLogDirectory(dirname(__DIR__). DIRECTORY_SEPARATOR . 'log'); +$configurator->setTempDirectory(dirname(__FILE__) . DIRECTORY_SEPARATOR . 'temp'); +$configurator->setLogDirectory(dirname(__FILE__). DIRECTORY_SEPARATOR . 'log'); -// Enable components -// WebComponent -$webAppComponent = new WebAppComponent(); -$configurator->addComponent($webAppComponent); - -// Add directories -$configurator->addDirectory(dirname(__FILE__) . DIRECTORY_SEPARATOR . 'Config', 'config', Priority::HIGH); -$configurator->addDirectory(dirname(__FILE__) . DIRECTORY_SEPARATOR . 'Helper', 'helpers', Priority::HIGH); -$configurator->addDirectory(dirname(__FILE__) . DIRECTORY_SEPARATOR . 'Library', 'libraries', Priority::HIGH); -$configurator->addDirectory(dirname(__FILE__) . DIRECTORY_SEPARATOR . 'Plugin', 'plugins', Priority::HIGH); -$configurator->addDirectory(dirname(__FILE__) . DIRECTORY_SEPARATOR . 'Controller', 'controllers', Priority::HIGH); -$configurator->addDirectory(dirname(__FILE__) . DIRECTORY_SEPARATOR . 'View', 'views', Priority::HIGH); -$configurator->addDirectory(dirname(__FILE__) . DIRECTORY_SEPARATOR . 'Model', 'models', Priority::HIGH); -$configurator->addDirectory(dirname(__FILE__) . DIRECTORY_SEPARATOR . 'Layout', 'layouts', Priority::NORMAL); +// Add Async library +$configurator->deferComponentClassMethod('libraries', 'addLibraryClass', null, 'async', '\FuzeWorks\Async\Tasks'); // Debug $configurator->enableDebugMode()->setDebugAddress('ALL'); + +// Create container $container = $configurator->createContainer(); -//\FuzeWorks\Logger::enableScreenLog(); // Prepare arguments $script = array_shift($argv); @@ -77,6 +63,6 @@ $post = ($mode === 'post' ? true : false); // RUN THE APP /** @var Tasks $lib */ -$lib = $container->libraries->get('tasks'); +$lib = $container->libraries->get('async'); $worker = $lib->getWorker(); $worker->run($taskID, $post); \ No newline at end of file