diff --git a/Dockerfile b/Dockerfile index 5a4a7d0..f42a7bf 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,5 +3,9 @@ FROM php:7.3-cli-buster RUN apt-get update &&\ apt-get install --no-install-recommends --assume-yes --quiet procps ca-certificates curl git &&\ rm -rf /var/lib/apt/lists/* -# PDO -RUN docker-php-ext-install pdo_mysql \ No newline at end of file + +# Install Redis +RUN pecl install redis-5.1.1 && docker-php-ext-enable redis + +# Install Composer +RUN curl -sS https://getcomposer.org/installer | php -- --install-dir=/usr/local/bin --filename=composer \ No newline at end of file diff --git a/bin/supervisor b/bin/supervisor index 62f174e..4ffa970 100644 --- a/bin/supervisor +++ b/bin/supervisor @@ -77,8 +77,8 @@ if (!isset($arguments['bootstrap']) || empty($arguments['bootstrap'])) // Load the file. If it doesn't exist, fail. -$file = $arguments['bootstrap']; -if (!file_exists($file)) +$bootstrap = $arguments['bootstrap']; +if (!file_exists($bootstrap)) { fwrite(STDERR, "Could not load supervisor. Provided bootstrap doesn't exist."); die(1); @@ -86,7 +86,7 @@ if (!file_exists($file)) // Load the bootstrap /** @var Factory $container */ -$container = require($file); +$container = require($bootstrap); // Check if container is a Factory if (!$container instanceof Factory) @@ -108,7 +108,7 @@ try { // And finally, run the supervisor try { - $supervisor = $lib->getSuperVisor(); + $supervisor = $lib->getSuperVisor($bootstrap); while ($supervisor->cycle() === SuperVisor::RUNNING) { usleep(250000); } diff --git a/composer.json b/composer.json index 9698f4f..2e5c849 100644 --- a/composer.json +++ b/composer.json @@ -15,7 +15,11 @@ "require": { "php": ">=7.2.0", "fuzeworks/core": "~1.2.0", - "ext-json": "*" + "ext-json": "*", + "ext-redis": "*" + }, + "require-dev": { + "fuzeworks/tracycomponent": "~1.2.0" }, "autoload": { "psr-4": { diff --git a/src/FuzeWorks/Async/Executors/ShellExecutor.php b/src/FuzeWorks/Async/Executors/ShellExecutor.php index 30c5556..87df219 100644 --- a/src/FuzeWorks/Async/Executors/ShellExecutor.php +++ b/src/FuzeWorks/Async/Executors/ShellExecutor.php @@ -45,22 +45,25 @@ class ShellExecutor implements Executor private $binary; private $worker; + private $bootstrapFile; private $stdout = "> /dev/null"; private $stderr = "2> /dev/null"; /** * ShellExecutor constructor. * + * @param string $bootstrapFile * @param array $parameters * @throws TasksException */ - public function __construct(array $parameters) + public function __construct(string $bootstrapFile, array $parameters) { // Fetch workerFile $workerFile = $parameters['workerFile']; // First determine the PHP binary $this->binary = PHP_BINDIR . DS . 'php'; + $this->bootstrapFile = $bootstrapFile; if (!file_exists($workerFile)) throw new TasksException("Could not construct ShellExecutor. ShellWorker script does not exist."); @@ -80,7 +83,7 @@ class ShellExecutor implements Executor public function startTask(Task $task, bool $post = false): Task { // First prepare the command used to spawn workers - $commandString = "$this->binary $this->worker -t %s ".($post ? 'p' : '')." $this->stdout $this->stderr & echo $!"; + $commandString = "$this->binary $this->worker --bootstrap=".$this->bootstrapFile." -t %s ".($post ? 'p' : '')." $this->stdout $this->stderr & echo $!"; // Then execute the command using the base64_encoded string of the taskID $output = $this->shellExec($commandString, [base64_encode($task->getId())]); diff --git a/src/FuzeWorks/Async/Supervisors/ParallelSuperVisor.php b/src/FuzeWorks/Async/Supervisors/ParallelSuperVisor.php index e70ac1f..0408ec8 100644 --- a/src/FuzeWorks/Async/Supervisors/ParallelSuperVisor.php +++ b/src/FuzeWorks/Async/Supervisors/ParallelSuperVisor.php @@ -80,7 +80,7 @@ class ParallelSuperVisor implements SuperVisor for ($i=0;$itasks);$i++) { $task = $this->tasks[$i]; - + // PENDING: should start if not constrained if ($task->getStatus() === Task::PENDING) { @@ -97,6 +97,7 @@ class ParallelSuperVisor implements SuperVisor // Modify the task in TaskStorage $this->taskStorage->modifyTask($task); + fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus())); } // DELAYED: If task is delayed, and enough time has passed, change the status back to pending @@ -104,6 +105,7 @@ class ParallelSuperVisor implements SuperVisor { $task->setStatus(Task::PENDING); $this->taskStorage->modifyTask($task); + fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus())); } // CANCELLED/COMPLETED: remove the task if requested to do so @@ -139,6 +141,7 @@ class ParallelSuperVisor implements SuperVisor // If any changes have been made, they should be written to TaskStorage $this->taskStorage->modifyTask($task); + fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus())); } // FAILED: if a process has failed, attempt to rety if requested to do so @@ -161,6 +164,7 @@ class ParallelSuperVisor implements SuperVisor $task = $this->executor->startTask($task); $task->setStatus(Task::RUNNING); $this->taskStorage->modifyTask($task); + fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus())); continue; } } @@ -176,6 +180,7 @@ class ParallelSuperVisor implements SuperVisor $task->setStatus(Task::CANCELLED); $this->taskStorage->modifyTask($task); + fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus())); } // SUCCESS: if a task has succeeded, see if it needs a postHandler @@ -191,6 +196,7 @@ class ParallelSuperVisor implements SuperVisor $task->setStatus(Task::COMPLETED); $this->taskStorage->modifyTask($task); + fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus())); } // POST: when a task is currently running in it's postHandler @@ -226,6 +232,7 @@ class ParallelSuperVisor implements SuperVisor // If any changes have been made, they should be written to TaskStorage $this->taskStorage->modifyTask($task); + fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus())); } } @@ -263,6 +270,7 @@ class ParallelSuperVisor implements SuperVisor // Save changes to TaskStorage $this->taskStorage->modifyTask($task); + fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus())); } } diff --git a/src/FuzeWorks/Async/TaskStorage/RedisTaskStorage.php b/src/FuzeWorks/Async/TaskStorage/RedisTaskStorage.php new file mode 100644 index 0000000..8f0d000 --- /dev/null +++ b/src/FuzeWorks/Async/TaskStorage/RedisTaskStorage.php @@ -0,0 +1,278 @@ +conn = new Redis(); + + // Afterwards connect to server + $socketType = $parameters['socket_type']; + if ($socketType == 'unix') + $success = $this->conn->connect($parameters['socket']); + elseif ($socketType == 'tcp') + $success = $this->conn->connect($parameters['host'], $parameters['port'], $parameters['timeout']); + else + $success = false; + + // If unsuccessful, return false + if (!$success) + throw new TasksException("Could not construct RedisTaskStorage. Failed to connect."); + + // Otherwise attempt authentication, if needed + if (isset($parameters['password']) && !$this->conn->auth($parameters['password'])) + throw new TasksException("Could not construct RedisTaskStorage. Authentication failure."); + } catch (RedisException $e) { + throw new TasksException("Could not construct RedisTaskStorage. RedisException thrown: '" . $e->getMessage() . "'"); + } + } + + /** + * @inheritDoc + * @throws TasksException + */ + public function addTask(Task $task): bool + { + // Check if the task doesn't exist yet + $taskId = $task->getId(); + + // Query the index + $isMember = $this->conn->sIsMember($this->indexSet, $taskId); + if ($isMember) + throw new TasksException("Could not add Task to TaskStorage. Task '$taskId' already exists."); + + // Serialize the task and save it + $taskData = serialize($task); + $this->conn->set($this->key_prefix . $taskId, $taskData); + $this->conn->sAdd($this->indexSet, $taskId); + + return true; + } + + /** + * @inheritDoc + */ + public function readTasks(): array + { + return $this->refreshTasks(); + } + + /** + * @inheritDoc + */ + public function refreshTasks() + { + // First fetch an array of all tasks in the set + $taskList = $this->conn->sMembers($this->indexSet); + + // Go over each taskId and fetch the specific task + $tasks = []; + foreach ($taskList as $taskId) + $tasks[] = unserialize($this->conn->get($this->key_prefix . $taskId)); + + return $tasks; + } + + /** + * @inheritDoc + */ + public function getTaskById(string $identifier): Task + { + // Query the index + $isMember = $this->conn->sIsMember($this->indexSet, $identifier); + if (!$isMember) + throw new TasksException("Could not get task by ID. Task not found."); + + // Fetch the task + /** @var Task $task */ + $task = unserialize($this->conn->get($this->key_prefix . $identifier)); + + // Return the task + return $task; + } + + /** + * @inheritDoc + */ + public function modifyTask(Task $task): bool + { + // First get the task ID + $taskId = $task->getId(); + + // Check if it exists + $isMember = $this->conn->sIsMember($this->indexSet, $taskId); + if (!$isMember) + throw new TasksException("Could not modify task. Task '$taskId' already exists."); + + // And write the data + $taskData = serialize($task); + return $this->conn->set($this->key_prefix . $taskId, $taskData); + } + + /** + * @inheritDoc + * @throws TasksException + */ + public function deleteTask(Task $task): bool + { + // First get the task ID + $taskId = $task->getId(); + + // Check if it exists + $isMember = $this->conn->sIsMember($this->indexSet, $taskId); + if (!$isMember) + throw new TasksException("Could not modify task. Task '$taskId' already exists."); + + // Delete the key + $this->conn->del($this->key_prefix . $taskId); + $this->conn->sRem($this->indexSet, $taskId); + + // Remove all task output and post output + $settings = $task->getRetrySettings(); + $maxRetries = $settings['maxRetries']; + for ($i=0;$i<=$maxRetries;$i++) + { + // First remove all possible task output + if ($this->conn->exists($this->key_prefix . $taskId . '_output_' . $i)) + $this->conn->del($this->key_prefix . $taskId . '_output_' . $i); + + if ($this->conn->exists($this->key_prefix . $taskId . '_post_' . $i)) + $this->conn->del($this->key_prefix . $taskId . '_post_' . $i); + } + + return true; + } + + /** + * @inheritDoc + */ + public function writeTaskOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool + { + // First get the task ID + $taskId = $task->getId(); + + // Check if the key already exists + if ($this->conn->exists($this->key_prefix . $taskId . '_output_' . $attempt)) + throw new TasksException("Could not write task output. Output already written."); + + // Prepare contents + $contents = ['taskId' => $task->getId(), 'output' => $output, 'errors' => $errors, 'statusCode' => $statusCode]; + $data = serialize($contents); + + // Write contents + return $this->conn->set($this->key_prefix . $taskId . '_output_' . $attempt, $data); + } + + /** + * @inheritDoc + */ + public function writePostOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool + { + // First get the task ID + $taskId = $task->getId(); + + // Check if the key already exists + if ($this->conn->exists($this->key_prefix . $taskId . '_post_' . $attempt)) + throw new TasksException("Could not write post output. Output already written."); + + // Prepare contents + $contents = ['taskId' => $task->getId(), 'output' => $output, 'errors' => $errors, 'statusCode' => $statusCode]; + $data = serialize($contents); + + // Write contents + return $this->conn->set($this->key_prefix . $taskId . '_post_' . $attempt, $data); + } + + /** + * @inheritDoc + */ + public function readTaskOutput(Task $task, int $attempt = 0): ?array + { + // First get the task ID + $taskId = $task->getId(); + + // Check if the key already exists + if (!$this->conn->exists($this->key_prefix . $taskId . '_output_' . $attempt)) + return null; + + // Load and convert the data + $data = $this->conn->get($this->key_prefix . $taskId . '_output_' . $attempt); + $data = unserialize($data); + + return ['output' => $data['output'], 'errors' => $data['errors'], 'statusCode' => $data['statusCode']]; + } + + /** + * @inheritDoc + */ + public function readPostOutput(Task $task, int $attempt = 0): ?array + { + // First get the task ID + $taskId = $task->getId(); + + // Check if the key already exists + if (!$this->conn->exists($this->key_prefix . $taskId . '_post_' . $attempt)) + return null; + + // Load and convert the data + $data = $this->conn->get($this->key_prefix . $taskId . '_post_' . $attempt); + $data = unserialize($data); + + return ['output' => $data['output'], 'errors' => $data['errors'], 'statusCode' => $data['statusCode']]; + } +} \ No newline at end of file diff --git a/src/FuzeWorks/Async/Tasks.php b/src/FuzeWorks/Async/Tasks.php index 18df927..fde3962 100644 --- a/src/FuzeWorks/Async/Tasks.php +++ b/src/FuzeWorks/Async/Tasks.php @@ -75,14 +75,16 @@ class Tasks implements iLibrary } /** + * @param string $bootstrapFile + * @return SuperVisor * @throws TasksException */ - public function getSuperVisor(): SuperVisor + public function getSuperVisor(string $bootstrapFile): SuperVisor { $cfg = $this->cfg->get('SuperVisor'); $class = 'FuzeWorks\Async\Supervisors\\' . $cfg['type']; $parameters = isset($cfg['parameters']) && is_array($cfg['parameters']) ? $cfg['parameters'] : []; - array_unshift($parameters, $this->getTaskStorage(), $this->getExecutor()); + array_unshift($parameters, $this->getTaskStorage(), $this->getExecutor($bootstrapFile)); if (!class_exists($class, true)) throw new TasksException("Could not get SuperVisor. Type of '$class' not found."); @@ -126,10 +128,11 @@ class Tasks implements iLibrary /** * Fetch the Executor based on the configured type * + * @param string $bootstrapFile * @return Executor * @throws TasksException */ - protected function getExecutor(): Executor + protected function getExecutor(string $bootstrapFile): Executor { $cfg = $this->cfg->get('Executor'); $class = 'FuzeWorks\Async\Executors\\' . $cfg['type']; @@ -137,7 +140,7 @@ class Tasks implements iLibrary if (!class_exists($class, true)) throw new TasksException("Could not get Executor. Type of '$class' not found."); - $object = new $class($parameters); + $object = new $class($bootstrapFile, $parameters); if (!$object instanceof Executor) throw new TasksException("Could not get Executor. Type '$class' is not instanceof Executor.");