From 4f39b0bec3c9133e16836c17efe2632c7b3a93a2 Mon Sep 17 00:00:00 2001 From: Abel Hoogeveen Date: Thu, 4 Jun 2020 21:29:37 +0200 Subject: [PATCH] Changed the way task output and post output is saved. Redis now saves all output for a task within a hash. This hash contains individual tasks and all its output. Attempts also start at 1, since that makes most sense for this context. When output is written, the TaskStorage must figure out at which attempt the Task is. --- src/FuzeWorks/Async/ShellWorker.php | 12 +- .../Async/Supervisors/ParallelSuperVisor.php | 4 +- src/FuzeWorks/Async/TaskStorage.php | 8 +- .../Async/TaskStorage/DummyTaskStorage.php | 26 +++- .../Async/TaskStorage/RedisTaskStorage.php | 126 +++++++++--------- test/base/TaskStorageTest.php | 73 +++------- 6 files changed, 118 insertions(+), 131 deletions(-) diff --git a/src/FuzeWorks/Async/ShellWorker.php b/src/FuzeWorks/Async/ShellWorker.php index 4a67902..6d8b452 100644 --- a/src/FuzeWorks/Async/ShellWorker.php +++ b/src/FuzeWorks/Async/ShellWorker.php @@ -115,13 +115,13 @@ class ShellWorker // If the task failed, write so to task storage, based on whether this is a post request or not if (!$success && $post) - $this->taskStorage->writePostOutput($this->task, $output, $errors, Task::FAILED, $this->task->getRetries()); + $this->taskStorage->writePostOutput($this->task, $output, $errors, Task::FAILED); elseif (!$success && !$post) - $this->taskStorage->writeTaskOutput($this->task, $output, $errors, Task::FAILED, $this->task->getRetries()); + $this->taskStorage->writeTaskOutput($this->task, $output, $errors, Task::FAILED); elseif ($success && $post) - $this->taskStorage->writePostOutput($this->task, $output, $errors, Task::SUCCESS, $this->task->getRetries()); + $this->taskStorage->writePostOutput($this->task, $output, $errors, Task::SUCCESS); else - $this->taskStorage->writeTaskOutput($this->task, $output, $errors, Task::SUCCESS, $this->task->getRetries()); + $this->taskStorage->writeTaskOutput($this->task, $output, $errors, Task::SUCCESS); // And write the final output $this->output((string) $output, $errors); @@ -166,9 +166,9 @@ class ShellWorker try { // Write to TaskStorage if (!$this->post) - $this->taskStorage->writeTaskOutput($this->task, '', $errors, Task::FAILED, $this->task->getRetries()); + $this->taskStorage->writeTaskOutput($this->task, '', $errors, Task::FAILED); else - $this->taskStorage->writePostOutput($this->task, '', $errors, Task::FAILED, $this->task->getRetries()); + $this->taskStorage->writePostOutput($this->task, '', $errors, Task::FAILED); } catch (TasksException $e) { // Ignore } diff --git a/src/FuzeWorks/Async/Supervisors/ParallelSuperVisor.php b/src/FuzeWorks/Async/Supervisors/ParallelSuperVisor.php index f3a36d0..2d0425f 100644 --- a/src/FuzeWorks/Async/Supervisors/ParallelSuperVisor.php +++ b/src/FuzeWorks/Async/Supervisors/ParallelSuperVisor.php @@ -118,7 +118,7 @@ class ParallelSuperVisor implements SuperVisor elseif ($task->getStatus() === Task::RUNNING) { $isRunning = $this->executor->getTaskRunning($task); - $output = $this->taskStorage->readTaskOutput($task, $task->getRetries()); + $output = $this->taskStorage->readTaskOutput($task); $hasOutput = !is_null($output); // If nothing is found, the process has crashed and status PFAILED should be set @@ -204,7 +204,7 @@ class ParallelSuperVisor implements SuperVisor elseif ($task->getStatus() === Task::POST) { $isRunning = $this->executor->getTaskRunning($task); - $output = $this->taskStorage->readPostOutput($task, $task->getRetries()); + $output = $this->taskStorage->readPostOutput($task); $hasOutput = !is_null($output); // If a task is not running and has no output, an error has occurred diff --git a/src/FuzeWorks/Async/TaskStorage.php b/src/FuzeWorks/Async/TaskStorage.php index 1d18ec4..47db727 100644 --- a/src/FuzeWorks/Async/TaskStorage.php +++ b/src/FuzeWorks/Async/TaskStorage.php @@ -111,7 +111,7 @@ interface TaskStorage * @return bool * @throws TasksException */ - public function writeTaskOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool; + public function writeTaskOutput(Task $task, string $output, string $errors, int $statusCode): bool; /** * Write the output of the postHandler into TaskStorage @@ -126,7 +126,7 @@ interface TaskStorage * @return bool * @throws TasksException */ - public function writePostOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool; + public function writePostOutput(Task $task, string $output, string $errors, int $statusCode): bool; /** * Read the task output from taskStorage. @@ -144,7 +144,7 @@ interface TaskStorage * @param int $attempt * @return array|null */ - public function readTaskOutput(Task $task, int $attempt = 0): ?array; + public function readTaskOutput(Task $task, int $attempt = 1): ?array; /** * Read the output from the postHandler @@ -162,7 +162,7 @@ interface TaskStorage * @param int $attempt * @return array|null */ - public function readPostOutput(Task $task, int $attempt = 0): ?array; + public function readPostOutput(Task $task, int $attempt = 1): ?array; /** * Reset the TaskStorage. diff --git a/src/FuzeWorks/Async/TaskStorage/DummyTaskStorage.php b/src/FuzeWorks/Async/TaskStorage/DummyTaskStorage.php index 60bd7c3..9d6e87d 100644 --- a/src/FuzeWorks/Async/TaskStorage/DummyTaskStorage.php +++ b/src/FuzeWorks/Async/TaskStorage/DummyTaskStorage.php @@ -177,11 +177,20 @@ class DummyTaskStorage implements TaskStorage /** * @inheritDoc */ - public function writeTaskOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool + public function writeTaskOutput(Task $task, string $output, string $errors, int $statusCode): bool { // First check if the task exists $task = $this->getTaskById($task->getId()); + // Set the attempt number + if (!isset($this->taskOutput[$task->getId()]['outAttempts'])) + { + $this->taskOutput[$task->getId()]['outAttempts'] = 1; + $attempt = $this->taskOutput[$task->getId()]['outAttempts']; + } + else + $attempt = $this->taskOutput[$task->getId()]['outAttempts']++; + if (isset($this->taskOutput[$task->getId()]['task'][$attempt])) throw new TasksException("Could not write task output. Output already written."); @@ -197,11 +206,20 @@ class DummyTaskStorage implements TaskStorage /** * @inheritDoc */ - public function writePostOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool + public function writePostOutput(Task $task, string $output, string $errors, int $statusCode): bool { // First check if the task exists $task = $this->getTaskById($task->getId()); + // Set the attempt number + if (!isset($this->taskOutput[$task->getId()]['postAttempts'])) + { + $this->taskOutput[$task->getId()]['postAttempts'] = 1; + $attempt = $this->taskOutput[$task->getId()]['postAttempts']; + } + else + $attempt = $this->taskOutput[$task->getId()]['postAttempts']++; + if (isset($this->taskOutput[$task->getId()]['post'][$attempt])) throw new TasksException("Could not write task post output. Output already written."); @@ -217,7 +235,7 @@ class DummyTaskStorage implements TaskStorage /** * @inheritDoc */ - public function readTaskOutput(Task $task, int $attempt = 0): ?array + public function readTaskOutput(Task $task, int $attempt = 1): ?array { if (isset($this->taskOutput[$task->getId()]['task'][$attempt])) return $this->taskOutput[$task->getId()]['task'][$attempt]; @@ -228,7 +246,7 @@ class DummyTaskStorage implements TaskStorage /** * @inheritDoc */ - public function readPostOutput(Task $task, int $attempt = 0): ?array + public function readPostOutput(Task $task, int $attempt = 1): ?array { if (isset($this->taskOutput[$task->getId()]['post'][$attempt])) return $this->taskOutput[$task->getId()]['post'][$attempt]; diff --git a/src/FuzeWorks/Async/TaskStorage/RedisTaskStorage.php b/src/FuzeWorks/Async/TaskStorage/RedisTaskStorage.php index 8ebb22d..a24ca82 100644 --- a/src/FuzeWorks/Async/TaskStorage/RedisTaskStorage.php +++ b/src/FuzeWorks/Async/TaskStorage/RedisTaskStorage.php @@ -104,9 +104,14 @@ class RedisTaskStorage implements TaskStorage // Serialize the task and save it $taskData = serialize($task); - $this->conn->set($this->key_prefix . $taskId, $taskData); + + // Register the task $this->conn->sAdd($this->indexSet, $taskId); + // And create a hash for it + if ($this->conn->hSet($this->key_prefix . $taskId, 'data', $taskData) === FALSE) + return false; + return true; } @@ -129,7 +134,7 @@ class RedisTaskStorage implements TaskStorage // Go over each taskId and fetch the specific task $tasks = []; foreach ($taskList as $taskId) - $tasks[] = unserialize($this->conn->get($this->key_prefix . $taskId)); + $tasks[] = unserialize($this->conn->hGet($this->key_prefix . $taskId, 'data')); return $tasks; } @@ -146,7 +151,7 @@ class RedisTaskStorage implements TaskStorage // Fetch the task /** @var Task $task */ - $task = unserialize($this->conn->get($this->key_prefix . $identifier)); + $task = unserialize($this->conn->hGet($this->key_prefix . $identifier, 'data')); // Return the task return $task; @@ -163,7 +168,7 @@ class RedisTaskStorage implements TaskStorage // Check if it exists $isMember = $this->conn->sIsMember($this->indexSet, $taskId); if (!$isMember) - throw new TasksException("Could not modify task. Task '$taskId' already exists."); + throw new TasksException("Could not modify task. Task '$taskId' does not exists."); // Fire the TaskModifyEvent /** @var TaskModifyEvent $event */ @@ -176,7 +181,10 @@ class RedisTaskStorage implements TaskStorage // And write the data $taskData = serialize($event->getTask()); - return $this->conn->set($this->key_prefix . $taskId, $taskData); + if ($this->conn->hSet($this->key_prefix . $taskId, 'data', $taskData) === FALSE) + return false; + + return true; } /** @@ -191,24 +199,40 @@ class RedisTaskStorage implements TaskStorage // Check if it exists $isMember = $this->conn->sIsMember($this->indexSet, $taskId); if (!$isMember) - throw new TasksException("Could not modify task. Task '$taskId' already exists."); + throw new TasksException("Could not delete task. Task '$taskId' does not exists."); - // Delete the key - $this->conn->del($this->key_prefix . $taskId); + // Delete the task from the index $this->conn->sRem($this->indexSet, $taskId); - // Remove all task output and post output - $settings = $task->getRetrySettings(); - $maxRetries = $settings['maxRetries']; - for ($i=0;$i<=$maxRetries;$i++) - { - // First remove all possible task output - if ($this->conn->exists($this->key_prefix . $taskId . '_output_' . $i)) - $this->conn->del($this->key_prefix . $taskId . '_output_' . $i); + // Delete the task itself + if ($this->conn->del($this->key_prefix . $taskId) > 0) + return true; - if ($this->conn->exists($this->key_prefix . $taskId . '_post_' . $i)) - $this->conn->del($this->key_prefix . $taskId . '_post_' . $i); - } + return false; + } + + /** + * @inheritDoc + */ + public function writeTaskOutput(Task $task, string $output, string $errors, int $statusCode): bool + { + // First get the task ID + $taskId = $task->getId(); + + // Check if the task exists + $isMember = $this->conn->sIsMember($this->indexSet, $taskId); + if (!$isMember) + throw new TasksException("Could not write task output. Task '$taskId' not found."); + + // Prepare the data + $contents = ['taskId' => $taskId, 'output' => $output, 'errors' => $errors, 'statusCode' => $statusCode]; + + // Determine the attempt number + $attempt = $this->conn->hIncrBy($this->key_prefix . $taskId, 'taskOutputAttempts', 1); + + // Then write this output + if ($this->conn->hSet($this->key_prefix . $taskId, 'output' . $attempt, serialize($contents)) === FALSE) + return false; return true; } @@ -216,63 +240,43 @@ class RedisTaskStorage implements TaskStorage /** * @inheritDoc */ - public function writeTaskOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool + public function writePostOutput(Task $task, string $output, string $errors, int $statusCode): bool { // First get the task ID $taskId = $task->getId(); // Check if the task exists - $task = $this->getTaskById($taskId); + $isMember = $this->conn->sIsMember($this->indexSet, $taskId); + if (!$isMember) + throw new TasksException("Could not write post output. Task '$taskId' not found."); - // Check if the key already exists - if ($this->conn->exists($this->key_prefix . $taskId . '_output_' . $attempt)) - throw new TasksException("Could not write task output. Output already written."); + // Prepare the data + $contents = ['taskId' => $taskId, 'output' => $output, 'errors' => $errors, 'statusCode' => $statusCode]; - // Prepare contents - $contents = ['taskId' => $task->getId(), 'output' => $output, 'errors' => $errors, 'statusCode' => $statusCode]; - $data = serialize($contents); + // Determine the attempt number + $attempt = $this->conn->hIncrBy($this->key_prefix . $taskId, 'taskPostAttempts', 1); - // Write contents - return $this->conn->set($this->key_prefix . $taskId . '_output_' . $attempt, $data); + // Then write this output + if ($this->conn->hSet($this->key_prefix . $taskId, 'postOutput' . $attempt, serialize($contents)) === FALSE) + return false; + + return true; } /** * @inheritDoc */ - public function writePostOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool + public function readTaskOutput(Task $task, int $attempt = 1): ?array { // First get the task ID $taskId = $task->getId(); - // Check if the task exists - $task = $this->getTaskById($taskId); - - // Check if the key already exists - if ($this->conn->exists($this->key_prefix . $taskId . '_post_' . $attempt)) - throw new TasksException("Could not write post output. Output already written."); - - // Prepare contents - $contents = ['taskId' => $task->getId(), 'output' => $output, 'errors' => $errors, 'statusCode' => $statusCode]; - $data = serialize($contents); - - // Write contents - return $this->conn->set($this->key_prefix . $taskId . '_post_' . $attempt, $data); - } - - /** - * @inheritDoc - */ - public function readTaskOutput(Task $task, int $attempt = 0): ?array - { - // First get the task ID - $taskId = $task->getId(); - - // Check if the key already exists - if (!$this->conn->exists($this->key_prefix . $taskId . '_output_' . $attempt)) + // Check if this output exists + if (!$this->conn->hExists($this->key_prefix . $taskId, 'output' . $attempt)) return null; // Load and convert the data - $data = $this->conn->get($this->key_prefix . $taskId . '_output_' . $attempt); + $data = $this->conn->hGet($this->key_prefix . $taskId, 'output' . $attempt); $data = unserialize($data); return ['output' => $data['output'], 'errors' => $data['errors'], 'statusCode' => $data['statusCode']]; @@ -281,17 +285,17 @@ class RedisTaskStorage implements TaskStorage /** * @inheritDoc */ - public function readPostOutput(Task $task, int $attempt = 0): ?array + public function readPostOutput(Task $task, int $attempt = 1): ?array { // First get the task ID $taskId = $task->getId(); - - // Check if the key already exists - if (!$this->conn->exists($this->key_prefix . $taskId . '_post_' . $attempt)) + + // Check if this output exists + if (!$this->conn->hExists($this->key_prefix . $taskId, 'postOutput' . $attempt)) return null; // Load and convert the data - $data = $this->conn->get($this->key_prefix . $taskId . '_post_' . $attempt); + $data = $this->conn->hGet($this->key_prefix . $taskId, 'postOutput' . $attempt); $data = unserialize($data); return ['output' => $data['output'], 'errors' => $data['errors'], 'statusCode' => $data['statusCode']]; diff --git a/test/base/TaskStorageTest.php b/test/base/TaskStorageTest.php index 27bf978..bcaf999 100644 --- a/test/base/TaskStorageTest.php +++ b/test/base/TaskStorageTest.php @@ -315,10 +315,10 @@ class TaskStorageTest extends TestCase // First write the task output $this->taskStorage->addTask($dummyTask); - $this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output', 'errors', 0, 0)); + $this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output', 'errors', 0)); // Then try to read the output - $output = $this->taskStorage->readTaskOutput($dummyTask, 0); + $output = $this->taskStorage->readTaskOutput($dummyTask, 1); $this->assertEquals('output', $output['output']); $this->assertEquals('errors', $output['errors']); $this->assertEquals(0, $output['statusCode']); @@ -346,26 +346,25 @@ class TaskStorageTest extends TestCase $dummyTask = new Task('testWriteAndReadTaskOutputAttempts', new EmptyHandler()); $this->taskStorage->addTask($dummyTask); - // Write the different outputs. Done in a weird order to make sure the default is inserted not first or last - // to make sure the default is not selected by accident by the TaskStorage - $this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output2', 'errors2', 102, 2)); - $this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output0', 'errors0', 100, 0)); - $this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output1', 'errors1', 101, 1)); + // Write the different outputs. + $this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output0', 'errors0', 100)); + $this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output1', 'errors1', 101)); + $this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output2', 'errors2', 102)); // Attempt to load the first output - $output0 = $this->taskStorage->readTaskOutput($dummyTask, 0); + $output0 = $this->taskStorage->readTaskOutput($dummyTask, 1); $this->assertEquals('output0', $output0['output']); $this->assertEquals('errors0', $output0['errors']); $this->assertEquals(100, $output0['statusCode']); // Attempt to load the second output - $output1 = $this->taskStorage->readTaskOutput($dummyTask, 1); + $output1 = $this->taskStorage->readTaskOutput($dummyTask, 2); $this->assertEquals('output1', $output1['output']); $this->assertEquals('errors1', $output1['errors']); $this->assertEquals(101, $output1['statusCode']); // Attempt to load the third output - $output2 = $this->taskStorage->readTaskOutput($dummyTask, 2); + $output2 = $this->taskStorage->readTaskOutput($dummyTask, 3); $this->assertEquals('output2', $output2['output']); $this->assertEquals('errors2', $output2['errors']); $this->assertEquals(102, $output2['statusCode']); @@ -377,23 +376,6 @@ class TaskStorageTest extends TestCase $this->assertEquals(100, $output['statusCode']); } - /** - * @depends testWriteAndReadTaskOutput - */ - public function testWriteAndReadTaskOutputAlreadyExists() - { - // Prepare a dummy task - $dummyTask = new Task('testWriteAndReadTaskOutputAlreadyExists', new EmptyHandler()); - $this->taskStorage->addTask($dummyTask); - - // Write a first time - $this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output', 'errors', 100, 0)); - - // And write it a second time - $this->expectException(TasksException::class); - $this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output', 'errors', 100, 0)); - } - /** * @depends testWriteAndReadTaskOutput */ @@ -418,10 +400,10 @@ class TaskStorageTest extends TestCase $this->taskStorage->addTask($dummyTask); // First write the task output - $this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'postOutput', 'errors', 0, 0)); + $this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'postOutput', 'errors', 0)); // Then try to read the output - $output = $this->taskStorage->readPostOutput($dummyTask, 0); + $output = $this->taskStorage->readPostOutput($dummyTask, 1); $this->assertEquals('postOutput', $output['output']); $this->assertEquals('errors', $output['errors']); $this->assertEquals(0, $output['statusCode']); @@ -436,26 +418,26 @@ class TaskStorageTest extends TestCase $dummyTask = new Task('testWriteAndReadTaskPostOutputAttempts', new EmptyHandler()); $this->taskStorage->addTask($dummyTask); - // Write the different outputs. Done in a weird order to make sure the default is inserted not first or last - // to make sure the default is not selected by accident by the TaskStorage - $this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'output2', 'errors2', 102, 2)); - $this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'output0', 'errors0', 100, 0)); - $this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'output1', 'errors1', 101, 1)); + // Write the different outputs + $this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'output0', 'errors0', 100)); + $this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'output1', 'errors1', 101)); + $this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'output2', 'errors2', 102)); + // Attempt to load the first output - $output0 = $this->taskStorage->readPostOutput($dummyTask, 0); + $output0 = $this->taskStorage->readPostOutput($dummyTask, 1); $this->assertEquals('output0', $output0['output']); $this->assertEquals('errors0', $output0['errors']); $this->assertEquals(100, $output0['statusCode']); // Attempt to load the second output - $output1 = $this->taskStorage->readPostOutput($dummyTask, 1); + $output1 = $this->taskStorage->readPostOutput($dummyTask, 2); $this->assertEquals('output1', $output1['output']); $this->assertEquals('errors1', $output1['errors']); $this->assertEquals(101, $output1['statusCode']); // Attempt to load the third output - $output2 = $this->taskStorage->readPostOutput($dummyTask, 2); + $output2 = $this->taskStorage->readPostOutput($dummyTask, 3); $this->assertEquals('output2', $output2['output']); $this->assertEquals('errors2', $output2['errors']); $this->assertEquals(102, $output2['statusCode']); @@ -467,23 +449,6 @@ class TaskStorageTest extends TestCase $this->assertEquals(100, $output['statusCode']); } - /** - * @depends testWriteAndReadTaskPostOutput - */ - public function testWriteAndReadTaskPostOutputAlreadyExists() - { - // Prepare a dummy task - $dummyTask = new Task('testWriteAndReadTaskPostOutputAlreadyExists', new EmptyHandler()); - $this->taskStorage->addTask($dummyTask); - - // Write a first time - $this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'output', 'errors', 100, 0)); - - // And write it a second time - $this->expectException(TasksException::class); - $this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'output', 'errors', 100, 0)); - } - /** * @depends testWriteAndReadTaskPostOutput */