Changed the way task output and post output is saved.

Redis now saves all output for a task within a hash. This hash contains individual tasks and all its output.
Attempts also start at 1, since that makes most sense for this context. When output is written, the TaskStorage must figure out at which attempt the Task is.
This commit is contained in:
Abel Hoogeveen 2020-06-04 21:29:37 +02:00
parent 902693dbbe
commit 4f39b0bec3
No known key found for this signature in database
GPG Key ID: 96C2234920BF4292
6 changed files with 118 additions and 131 deletions

View File

@ -115,13 +115,13 @@ class ShellWorker
// If the task failed, write so to task storage, based on whether this is a post request or not
if (!$success && $post)
$this->taskStorage->writePostOutput($this->task, $output, $errors, Task::FAILED, $this->task->getRetries());
$this->taskStorage->writePostOutput($this->task, $output, $errors, Task::FAILED);
elseif (!$success && !$post)
$this->taskStorage->writeTaskOutput($this->task, $output, $errors, Task::FAILED, $this->task->getRetries());
$this->taskStorage->writeTaskOutput($this->task, $output, $errors, Task::FAILED);
elseif ($success && $post)
$this->taskStorage->writePostOutput($this->task, $output, $errors, Task::SUCCESS, $this->task->getRetries());
$this->taskStorage->writePostOutput($this->task, $output, $errors, Task::SUCCESS);
else
$this->taskStorage->writeTaskOutput($this->task, $output, $errors, Task::SUCCESS, $this->task->getRetries());
$this->taskStorage->writeTaskOutput($this->task, $output, $errors, Task::SUCCESS);
// And write the final output
$this->output((string) $output, $errors);
@ -166,9 +166,9 @@ class ShellWorker
try {
// Write to TaskStorage
if (!$this->post)
$this->taskStorage->writeTaskOutput($this->task, '', $errors, Task::FAILED, $this->task->getRetries());
$this->taskStorage->writeTaskOutput($this->task, '', $errors, Task::FAILED);
else
$this->taskStorage->writePostOutput($this->task, '', $errors, Task::FAILED, $this->task->getRetries());
$this->taskStorage->writePostOutput($this->task, '', $errors, Task::FAILED);
} catch (TasksException $e) {
// Ignore
}

View File

@ -118,7 +118,7 @@ class ParallelSuperVisor implements SuperVisor
elseif ($task->getStatus() === Task::RUNNING)
{
$isRunning = $this->executor->getTaskRunning($task);
$output = $this->taskStorage->readTaskOutput($task, $task->getRetries());
$output = $this->taskStorage->readTaskOutput($task);
$hasOutput = !is_null($output);
// If nothing is found, the process has crashed and status PFAILED should be set
@ -204,7 +204,7 @@ class ParallelSuperVisor implements SuperVisor
elseif ($task->getStatus() === Task::POST)
{
$isRunning = $this->executor->getTaskRunning($task);
$output = $this->taskStorage->readPostOutput($task, $task->getRetries());
$output = $this->taskStorage->readPostOutput($task);
$hasOutput = !is_null($output);
// If a task is not running and has no output, an error has occurred

View File

@ -111,7 +111,7 @@ interface TaskStorage
* @return bool
* @throws TasksException
*/
public function writeTaskOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool;
public function writeTaskOutput(Task $task, string $output, string $errors, int $statusCode): bool;
/**
* Write the output of the postHandler into TaskStorage
@ -126,7 +126,7 @@ interface TaskStorage
* @return bool
* @throws TasksException
*/
public function writePostOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool;
public function writePostOutput(Task $task, string $output, string $errors, int $statusCode): bool;
/**
* Read the task output from taskStorage.
@ -144,7 +144,7 @@ interface TaskStorage
* @param int $attempt
* @return array|null
*/
public function readTaskOutput(Task $task, int $attempt = 0): ?array;
public function readTaskOutput(Task $task, int $attempt = 1): ?array;
/**
* Read the output from the postHandler
@ -162,7 +162,7 @@ interface TaskStorage
* @param int $attempt
* @return array|null
*/
public function readPostOutput(Task $task, int $attempt = 0): ?array;
public function readPostOutput(Task $task, int $attempt = 1): ?array;
/**
* Reset the TaskStorage.

View File

@ -177,11 +177,20 @@ class DummyTaskStorage implements TaskStorage
/**
* @inheritDoc
*/
public function writeTaskOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool
public function writeTaskOutput(Task $task, string $output, string $errors, int $statusCode): bool
{
// First check if the task exists
$task = $this->getTaskById($task->getId());
// Set the attempt number
if (!isset($this->taskOutput[$task->getId()]['outAttempts']))
{
$this->taskOutput[$task->getId()]['outAttempts'] = 1;
$attempt = $this->taskOutput[$task->getId()]['outAttempts'];
}
else
$attempt = $this->taskOutput[$task->getId()]['outAttempts']++;
if (isset($this->taskOutput[$task->getId()]['task'][$attempt]))
throw new TasksException("Could not write task output. Output already written.");
@ -197,11 +206,20 @@ class DummyTaskStorage implements TaskStorage
/**
* @inheritDoc
*/
public function writePostOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool
public function writePostOutput(Task $task, string $output, string $errors, int $statusCode): bool
{
// First check if the task exists
$task = $this->getTaskById($task->getId());
// Set the attempt number
if (!isset($this->taskOutput[$task->getId()]['postAttempts']))
{
$this->taskOutput[$task->getId()]['postAttempts'] = 1;
$attempt = $this->taskOutput[$task->getId()]['postAttempts'];
}
else
$attempt = $this->taskOutput[$task->getId()]['postAttempts']++;
if (isset($this->taskOutput[$task->getId()]['post'][$attempt]))
throw new TasksException("Could not write task post output. Output already written.");
@ -217,7 +235,7 @@ class DummyTaskStorage implements TaskStorage
/**
* @inheritDoc
*/
public function readTaskOutput(Task $task, int $attempt = 0): ?array
public function readTaskOutput(Task $task, int $attempt = 1): ?array
{
if (isset($this->taskOutput[$task->getId()]['task'][$attempt]))
return $this->taskOutput[$task->getId()]['task'][$attempt];
@ -228,7 +246,7 @@ class DummyTaskStorage implements TaskStorage
/**
* @inheritDoc
*/
public function readPostOutput(Task $task, int $attempt = 0): ?array
public function readPostOutput(Task $task, int $attempt = 1): ?array
{
if (isset($this->taskOutput[$task->getId()]['post'][$attempt]))
return $this->taskOutput[$task->getId()]['post'][$attempt];

View File

@ -104,9 +104,14 @@ class RedisTaskStorage implements TaskStorage
// Serialize the task and save it
$taskData = serialize($task);
$this->conn->set($this->key_prefix . $taskId, $taskData);
// Register the task
$this->conn->sAdd($this->indexSet, $taskId);
// And create a hash for it
if ($this->conn->hSet($this->key_prefix . $taskId, 'data', $taskData) === FALSE)
return false;
return true;
}
@ -129,7 +134,7 @@ class RedisTaskStorage implements TaskStorage
// Go over each taskId and fetch the specific task
$tasks = [];
foreach ($taskList as $taskId)
$tasks[] = unserialize($this->conn->get($this->key_prefix . $taskId));
$tasks[] = unserialize($this->conn->hGet($this->key_prefix . $taskId, 'data'));
return $tasks;
}
@ -146,7 +151,7 @@ class RedisTaskStorage implements TaskStorage
// Fetch the task
/** @var Task $task */
$task = unserialize($this->conn->get($this->key_prefix . $identifier));
$task = unserialize($this->conn->hGet($this->key_prefix . $identifier, 'data'));
// Return the task
return $task;
@ -163,7 +168,7 @@ class RedisTaskStorage implements TaskStorage
// Check if it exists
$isMember = $this->conn->sIsMember($this->indexSet, $taskId);
if (!$isMember)
throw new TasksException("Could not modify task. Task '$taskId' already exists.");
throw new TasksException("Could not modify task. Task '$taskId' does not exists.");
// Fire the TaskModifyEvent
/** @var TaskModifyEvent $event */
@ -176,7 +181,10 @@ class RedisTaskStorage implements TaskStorage
// And write the data
$taskData = serialize($event->getTask());
return $this->conn->set($this->key_prefix . $taskId, $taskData);
if ($this->conn->hSet($this->key_prefix . $taskId, 'data', $taskData) === FALSE)
return false;
return true;
}
/**
@ -191,24 +199,40 @@ class RedisTaskStorage implements TaskStorage
// Check if it exists
$isMember = $this->conn->sIsMember($this->indexSet, $taskId);
if (!$isMember)
throw new TasksException("Could not modify task. Task '$taskId' already exists.");
throw new TasksException("Could not delete task. Task '$taskId' does not exists.");
// Delete the key
$this->conn->del($this->key_prefix . $taskId);
// Delete the task from the index
$this->conn->sRem($this->indexSet, $taskId);
// Remove all task output and post output
$settings = $task->getRetrySettings();
$maxRetries = $settings['maxRetries'];
for ($i=0;$i<=$maxRetries;$i++)
{
// First remove all possible task output
if ($this->conn->exists($this->key_prefix . $taskId . '_output_' . $i))
$this->conn->del($this->key_prefix . $taskId . '_output_' . $i);
// Delete the task itself
if ($this->conn->del($this->key_prefix . $taskId) > 0)
return true;
if ($this->conn->exists($this->key_prefix . $taskId . '_post_' . $i))
$this->conn->del($this->key_prefix . $taskId . '_post_' . $i);
}
return false;
}
/**
* @inheritDoc
*/
public function writeTaskOutput(Task $task, string $output, string $errors, int $statusCode): bool
{
// First get the task ID
$taskId = $task->getId();
// Check if the task exists
$isMember = $this->conn->sIsMember($this->indexSet, $taskId);
if (!$isMember)
throw new TasksException("Could not write task output. Task '$taskId' not found.");
// Prepare the data
$contents = ['taskId' => $taskId, 'output' => $output, 'errors' => $errors, 'statusCode' => $statusCode];
// Determine the attempt number
$attempt = $this->conn->hIncrBy($this->key_prefix . $taskId, 'taskOutputAttempts', 1);
// Then write this output
if ($this->conn->hSet($this->key_prefix . $taskId, 'output' . $attempt, serialize($contents)) === FALSE)
return false;
return true;
}
@ -216,63 +240,43 @@ class RedisTaskStorage implements TaskStorage
/**
* @inheritDoc
*/
public function writeTaskOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool
public function writePostOutput(Task $task, string $output, string $errors, int $statusCode): bool
{
// First get the task ID
$taskId = $task->getId();
// Check if the task exists
$task = $this->getTaskById($taskId);
$isMember = $this->conn->sIsMember($this->indexSet, $taskId);
if (!$isMember)
throw new TasksException("Could not write post output. Task '$taskId' not found.");
// Check if the key already exists
if ($this->conn->exists($this->key_prefix . $taskId . '_output_' . $attempt))
throw new TasksException("Could not write task output. Output already written.");
// Prepare the data
$contents = ['taskId' => $taskId, 'output' => $output, 'errors' => $errors, 'statusCode' => $statusCode];
// Prepare contents
$contents = ['taskId' => $task->getId(), 'output' => $output, 'errors' => $errors, 'statusCode' => $statusCode];
$data = serialize($contents);
// Determine the attempt number
$attempt = $this->conn->hIncrBy($this->key_prefix . $taskId, 'taskPostAttempts', 1);
// Write contents
return $this->conn->set($this->key_prefix . $taskId . '_output_' . $attempt, $data);
// Then write this output
if ($this->conn->hSet($this->key_prefix . $taskId, 'postOutput' . $attempt, serialize($contents)) === FALSE)
return false;
return true;
}
/**
* @inheritDoc
*/
public function writePostOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool
public function readTaskOutput(Task $task, int $attempt = 1): ?array
{
// First get the task ID
$taskId = $task->getId();
// Check if the task exists
$task = $this->getTaskById($taskId);
// Check if the key already exists
if ($this->conn->exists($this->key_prefix . $taskId . '_post_' . $attempt))
throw new TasksException("Could not write post output. Output already written.");
// Prepare contents
$contents = ['taskId' => $task->getId(), 'output' => $output, 'errors' => $errors, 'statusCode' => $statusCode];
$data = serialize($contents);
// Write contents
return $this->conn->set($this->key_prefix . $taskId . '_post_' . $attempt, $data);
}
/**
* @inheritDoc
*/
public function readTaskOutput(Task $task, int $attempt = 0): ?array
{
// First get the task ID
$taskId = $task->getId();
// Check if the key already exists
if (!$this->conn->exists($this->key_prefix . $taskId . '_output_' . $attempt))
// Check if this output exists
if (!$this->conn->hExists($this->key_prefix . $taskId, 'output' . $attempt))
return null;
// Load and convert the data
$data = $this->conn->get($this->key_prefix . $taskId . '_output_' . $attempt);
$data = $this->conn->hGet($this->key_prefix . $taskId, 'output' . $attempt);
$data = unserialize($data);
return ['output' => $data['output'], 'errors' => $data['errors'], 'statusCode' => $data['statusCode']];
@ -281,17 +285,17 @@ class RedisTaskStorage implements TaskStorage
/**
* @inheritDoc
*/
public function readPostOutput(Task $task, int $attempt = 0): ?array
public function readPostOutput(Task $task, int $attempt = 1): ?array
{
// First get the task ID
$taskId = $task->getId();
// Check if the key already exists
if (!$this->conn->exists($this->key_prefix . $taskId . '_post_' . $attempt))
// Check if this output exists
if (!$this->conn->hExists($this->key_prefix . $taskId, 'postOutput' . $attempt))
return null;
// Load and convert the data
$data = $this->conn->get($this->key_prefix . $taskId . '_post_' . $attempt);
$data = $this->conn->hGet($this->key_prefix . $taskId, 'postOutput' . $attempt);
$data = unserialize($data);
return ['output' => $data['output'], 'errors' => $data['errors'], 'statusCode' => $data['statusCode']];

View File

@ -315,10 +315,10 @@ class TaskStorageTest extends TestCase
// First write the task output
$this->taskStorage->addTask($dummyTask);
$this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output', 'errors', 0, 0));
$this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output', 'errors', 0));
// Then try to read the output
$output = $this->taskStorage->readTaskOutput($dummyTask, 0);
$output = $this->taskStorage->readTaskOutput($dummyTask, 1);
$this->assertEquals('output', $output['output']);
$this->assertEquals('errors', $output['errors']);
$this->assertEquals(0, $output['statusCode']);
@ -346,26 +346,25 @@ class TaskStorageTest extends TestCase
$dummyTask = new Task('testWriteAndReadTaskOutputAttempts', new EmptyHandler());
$this->taskStorage->addTask($dummyTask);
// Write the different outputs. Done in a weird order to make sure the default is inserted not first or last
// to make sure the default is not selected by accident by the TaskStorage
$this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output2', 'errors2', 102, 2));
$this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output0', 'errors0', 100, 0));
$this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output1', 'errors1', 101, 1));
// Write the different outputs.
$this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output0', 'errors0', 100));
$this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output1', 'errors1', 101));
$this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output2', 'errors2', 102));
// Attempt to load the first output
$output0 = $this->taskStorage->readTaskOutput($dummyTask, 0);
$output0 = $this->taskStorage->readTaskOutput($dummyTask, 1);
$this->assertEquals('output0', $output0['output']);
$this->assertEquals('errors0', $output0['errors']);
$this->assertEquals(100, $output0['statusCode']);
// Attempt to load the second output
$output1 = $this->taskStorage->readTaskOutput($dummyTask, 1);
$output1 = $this->taskStorage->readTaskOutput($dummyTask, 2);
$this->assertEquals('output1', $output1['output']);
$this->assertEquals('errors1', $output1['errors']);
$this->assertEquals(101, $output1['statusCode']);
// Attempt to load the third output
$output2 = $this->taskStorage->readTaskOutput($dummyTask, 2);
$output2 = $this->taskStorage->readTaskOutput($dummyTask, 3);
$this->assertEquals('output2', $output2['output']);
$this->assertEquals('errors2', $output2['errors']);
$this->assertEquals(102, $output2['statusCode']);
@ -377,23 +376,6 @@ class TaskStorageTest extends TestCase
$this->assertEquals(100, $output['statusCode']);
}
/**
* @depends testWriteAndReadTaskOutput
*/
public function testWriteAndReadTaskOutputAlreadyExists()
{
// Prepare a dummy task
$dummyTask = new Task('testWriteAndReadTaskOutputAlreadyExists', new EmptyHandler());
$this->taskStorage->addTask($dummyTask);
// Write a first time
$this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output', 'errors', 100, 0));
// And write it a second time
$this->expectException(TasksException::class);
$this->assertTrue($this->taskStorage->writeTaskOutput($dummyTask, 'output', 'errors', 100, 0));
}
/**
* @depends testWriteAndReadTaskOutput
*/
@ -418,10 +400,10 @@ class TaskStorageTest extends TestCase
$this->taskStorage->addTask($dummyTask);
// First write the task output
$this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'postOutput', 'errors', 0, 0));
$this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'postOutput', 'errors', 0));
// Then try to read the output
$output = $this->taskStorage->readPostOutput($dummyTask, 0);
$output = $this->taskStorage->readPostOutput($dummyTask, 1);
$this->assertEquals('postOutput', $output['output']);
$this->assertEquals('errors', $output['errors']);
$this->assertEquals(0, $output['statusCode']);
@ -436,26 +418,26 @@ class TaskStorageTest extends TestCase
$dummyTask = new Task('testWriteAndReadTaskPostOutputAttempts', new EmptyHandler());
$this->taskStorage->addTask($dummyTask);
// Write the different outputs. Done in a weird order to make sure the default is inserted not first or last
// to make sure the default is not selected by accident by the TaskStorage
$this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'output2', 'errors2', 102, 2));
$this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'output0', 'errors0', 100, 0));
$this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'output1', 'errors1', 101, 1));
// Write the different outputs
$this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'output0', 'errors0', 100));
$this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'output1', 'errors1', 101));
$this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'output2', 'errors2', 102));
// Attempt to load the first output
$output0 = $this->taskStorage->readPostOutput($dummyTask, 0);
$output0 = $this->taskStorage->readPostOutput($dummyTask, 1);
$this->assertEquals('output0', $output0['output']);
$this->assertEquals('errors0', $output0['errors']);
$this->assertEquals(100, $output0['statusCode']);
// Attempt to load the second output
$output1 = $this->taskStorage->readPostOutput($dummyTask, 1);
$output1 = $this->taskStorage->readPostOutput($dummyTask, 2);
$this->assertEquals('output1', $output1['output']);
$this->assertEquals('errors1', $output1['errors']);
$this->assertEquals(101, $output1['statusCode']);
// Attempt to load the third output
$output2 = $this->taskStorage->readPostOutput($dummyTask, 2);
$output2 = $this->taskStorage->readPostOutput($dummyTask, 3);
$this->assertEquals('output2', $output2['output']);
$this->assertEquals('errors2', $output2['errors']);
$this->assertEquals(102, $output2['statusCode']);
@ -467,23 +449,6 @@ class TaskStorageTest extends TestCase
$this->assertEquals(100, $output['statusCode']);
}
/**
* @depends testWriteAndReadTaskPostOutput
*/
public function testWriteAndReadTaskPostOutputAlreadyExists()
{
// Prepare a dummy task
$dummyTask = new Task('testWriteAndReadTaskPostOutputAlreadyExists', new EmptyHandler());
$this->taskStorage->addTask($dummyTask);
// Write a first time
$this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'output', 'errors', 100, 0));
// And write it a second time
$this->expectException(TasksException::class);
$this->assertTrue($this->taskStorage->writePostOutput($dummyTask, 'output', 'errors', 100, 0));
}
/**
* @depends testWriteAndReadTaskPostOutput
*/