Implemented Parent Handlers.
continuous-integration/drone/push Build is passing Details

Parent Handlers can be stacked to run in succession. Output is transfered as input into the child handler which can continue with it. If the parent Handler fails, all Child handlers also fail.
This commit is contained in:
Abel Hoogeveen 2020-06-03 17:00:44 +02:00
parent 4555957292
commit 902693dbbe
No known key found for this signature in database
GPG Key ID: 96C2234920BF4292
13 changed files with 386 additions and 67 deletions

View File

@ -43,11 +43,11 @@ use FuzeWorks\Exception\LibraryException;
use FuzeWorks\Factory;
// First perform a PHP version check
if (version_compare('7.1.0', PHP_VERSION, '>')) {
if (version_compare('7.3.0', PHP_VERSION, '>')) {
fwrite(
STDERR,
sprintf(
'FuzeWorks Async requires PHP 7.1 or higher.' . PHP_EOL .
'FuzeWorks Async requires PHP 7.3 or higher.' . PHP_EOL .
'You are using PHP %s (%s).' . PHP_EOL,
PHP_VERSION,
PHP_BINARY

View File

@ -42,11 +42,11 @@ use FuzeWorks\Factory;
// First perform a PHP version check
if (version_compare('7.1.0', PHP_VERSION, '>')) {
if (version_compare('7.3.0', PHP_VERSION, '>')) {
fwrite(
STDERR,
sprintf(
'FuzeWorks Async requires PHP 7.1 or higher.' . PHP_EOL .
'FuzeWorks Async requires PHP 7.3 or higher.' . PHP_EOL .
'You are using PHP %s (%s).' . PHP_EOL,
PHP_VERSION,
PHP_BINARY
@ -119,7 +119,7 @@ $post = isset($arguments['p']);
// RUN THE APP
$worker = $lib->getWorker();
$worker->run($taskID, $post);
$worker->runTaskById($taskID, $post);
fwrite(STDOUT,'Finished task \'' . $taskID . "'");
?>

View File

@ -56,6 +56,9 @@ class ShellExecutor implements Executor
*/
public function __construct(array $parameters)
{
if (!isset($parameters['workerFile']) || !isset($parameters['bootstrapFile']))
throw new TasksException("Could not construct ShellExecutor. Parameter failure.");
// Fetch workerFile
$this->worker = $parameters['workerFile'];
if (!file_exists($this->worker))

View File

@ -38,6 +38,27 @@ namespace FuzeWorks\Async;
interface Handler
{
/**
* Retrieve the parent handler that will first handle this task, before this child Handler
*
* @return Handler|null
*/
public function getParentHandler(): ?Handler;
/**
* Set the parent handler that will fire before this Handler
*
* @param Handler $parentHandler
*/
public function setParentHandler(Handler $parentHandler): void;
/**
* Import the parent output into the child
*
* @param mixed $input
*/
public function setParentInput($input): void;
/**
* The handler method used to handle this task.
* This handler will execute the actual task.

View File

@ -67,74 +67,84 @@ class ShellWorker
}
/**
* @param string $taskID
* Run a task by finding its ID
*
* @param string $taskId
* @param bool $post
* @throws EventException
* @throws TasksException
*/
public function run(string $taskID, bool $post = false)
public function runTaskById(string $taskId, bool $post = false)
{
// First fetch the task
try {
$task = $this->taskStorage->getTaskById($taskID);
$task = $this->taskStorage->getTaskById($taskId);
} catch (TasksException $e) {
throw new TasksException("Could not run worker. Task not found.");
}
$this->run($task, $post);
}
/**
* @param Task $task
* @param bool $post
* @throws EventException
* @throws TasksException
*/
public function run(Task $task, bool $post = false)
{
// Fire a taskHandleEvent
/** @var TaskHandleEvent $event */
$event = Events::fireEvent(new TaskHandleEvent(), $task);
$task = $event->getTask();
// Set task to this worker
$this->task = $task;
$this->task = $event->getTask();
$this->post = $post;
// Fetch the callable
$object = $this->task->getHandler();
if (!$object instanceof Handler)
{
$errors = "Could not run task. '".get_class($object)."' is not instance of Handler.";
if (!$post)
$this->taskStorage->writeTaskOutput($this->task, '', $errors, Task::PFAILED, $this->task->getRetries());
else
$this->taskStorage->writePostOutput($this->task, '', $errors, Task::PFAILED, $this->task->getRetries());
$handler = $this->task->getHandler();
throw new TasksException("Could not run task. '".get_class($object)."' is not instance of Handler.");
}
// Execute the handler and all its parent handlers
$success = $this->executeHandler($this->task, $handler, $post);
// Run postHandler if post mode is requested
if ($post)
{
$postSuccess = $object->postHandler($this->task);
$postOutput = $object->getPostOutput();
$postOutput = is_null($postOutput) ? '' : (string) $postOutput;
$postErrors = $this->getErrors();
if (!$postSuccess)
$this->taskStorage->writePostOutput($this->task, $postOutput, $postErrors, Task::FAILED, $this->task->getRetries());
else
$this->taskStorage->writePostOutput($this->task, $postOutput, $postErrors, Task::SUCCESS, $this->task->getRetries());
$this->output($postOutput, $postErrors);
return;
}
// Run primaryHandler if requested
$success = $object->primaryHandler($this->task);
$output = $object->getOutput();
$output = is_null($output) ? '' : (string) $output;
// Fetch the output and errors
$output = $post ? $handler->getPostOutput() : $handler->getOutput();
$output = is_null($output) ? '' : $output;
$errors = $this->getErrors();
// And afterwards write the results to the TaskStorage
if (!$success)
// If the task failed, write so to task storage, based on whether this is a post request or not
if (!$success && $post)
$this->taskStorage->writePostOutput($this->task, $output, $errors, Task::FAILED, $this->task->getRetries());
elseif (!$success && !$post)
$this->taskStorage->writeTaskOutput($this->task, $output, $errors, Task::FAILED, $this->task->getRetries());
elseif ($success && $post)
$this->taskStorage->writePostOutput($this->task, $output, $errors, Task::SUCCESS, $this->task->getRetries());
else
$this->taskStorage->writeTaskOutput($this->task, $output, $errors, Task::SUCCESS, $this->task->getRetries());
$this->output($output, $errors);
// And write the final output
$this->output((string) $output, $errors);
}
protected function executeHandler(Task $task, Handler $handler, bool $usePost = false): bool
{
// First check to see if there is a parent handler
$parent = $handler->getParentHandler();
if (!is_null($parent)) {
// Execute the parent
if ($this->executeHandler($task, $parent, $usePost) === false)
return false;
// Fetch the output of the parent
$output = $usePost ? $parent->getPostOutput() : $parent->getOutput();
// And insert it as input into the child handler
$handler->setParentInput($output);
}
return $usePost ? $handler->postHandler($task) : $handler->primaryHandler($task);
}
/**
* In case a fatal error or exception occurs, the errors shall be redirected to stderr
*

View File

@ -50,6 +50,26 @@ class Tasks implements iLibrary
*/
private $cfg;
/**
* @var TaskStorage
*/
private $taskStorage;
/**
* @var Executor
*/
private $executor;
/**
* @var SuperVisor
*/
private $supervisor;
/**
* @var ShellWorker
*/
private $shellWorker;
/**
* Tasks constructor.
*
@ -80,6 +100,9 @@ class Tasks implements iLibrary
*/
public function getSuperVisor(string $bootstrapFile): SuperVisor
{
if (isset($this->supervisor))
return $this->supervisor;
// First get the configuration for SuperVisors
$cfg = $this->cfg->get('SuperVisor');
@ -104,6 +127,7 @@ class Tasks implements iLibrary
if (!$object instanceof SuperVisor)
throw new TasksException("Could not get SuperVisor. Type of '$class' is not instanceof TaskStorage.");
$this->supervisor = $object;
return $object;
}
@ -113,7 +137,11 @@ class Tasks implements iLibrary
*/
public function getWorker(): ShellWorker
{
return new ShellWorker($this->getTaskStorage());
if (isset($this->shellWorker))
return $this->shellWorker;
$this->shellWorker = new ShellWorker($this->getTaskStorage());
return $this->shellWorker;
}
/**
@ -124,6 +152,9 @@ class Tasks implements iLibrary
*/
public function getTaskStorage(): TaskStorage
{
if (isset($this->taskStorage))
return $this->taskStorage;
// First get the configuration for TaskStorage
$cfg = $this->cfg->get('TaskStorage');
@ -145,7 +176,8 @@ class Tasks implements iLibrary
if (!$object instanceof TaskStorage)
throw new TasksException("Could not get TaskStorage. Type '$class' is not instanceof TaskStorage.");
return $object;
$this->taskStorage = $object;
return $this->taskStorage;
}
/**
@ -157,6 +189,9 @@ class Tasks implements iLibrary
*/
protected function getExecutor(string $bootstrapFile): Executor
{
if (isset($this->executor))
return $this->executor;
// First get the configuration for Executor
$cfg = $this->cfg->get('Executor');
@ -178,7 +213,8 @@ class Tasks implements iLibrary
if (!$object instanceof Executor)
throw new TasksException("Could not get Executor. Type '$class' is not instanceof Executor.");
return $object;
$this->executor = $object;
return $this->executor;
}
/**

View File

@ -146,7 +146,7 @@ class ShellExecutorTest extends TestCase
$task = $this->executor->startTask($dummyTask);
// Pause 1/10th of a second
usleep(100000);
usleep(500000);
// Assert that the output is the same
$this->assertSame($dummyTask, $task);
@ -173,7 +173,7 @@ class ShellExecutorTest extends TestCase
$dummyTask = $this->executor->startTask($dummyTask);
// Pause 1/10th of a second
usleep(100000);
usleep(500000);
// And we fetch some task statistics
$stats = $this->executor->getTaskStats($dummyTask);
@ -217,7 +217,7 @@ class ShellExecutorTest extends TestCase
$dummyTask = $this->executor->startTask($dummyTask);
// Pause 1/10th of a second
usleep(100000);
usleep(500000);
// Check if the task is running
$this->assertTrue($this->executor->getTaskRunning($dummyTask));
@ -226,7 +226,7 @@ class ShellExecutorTest extends TestCase
$output = $this->executor->stopTask($dummyTask);
// Pause 1/10th of a second
usleep(100000);
usleep(500000);
// We check that the output actually is the task
$this->assertSame($dummyTask, $output);

View File

@ -34,9 +34,9 @@
* @version Version 1.0.0
*/
use FuzeWorks\Async\Handler;
use FuzeWorks\Async\ShellWorker;
use FuzeWorks\Async\Task;
use FuzeWorks\Async\Tasks;
use FuzeWorks\Async\TaskStorage;
use FuzeWorks\Events;
use PHPUnit\Framework\TestCase;
@ -57,13 +57,13 @@ class ShellWorkerTest extends TestCase
public function setUp(): void
{
// Load the TaskStorage so temporary tasks can be stored
$tasks = new Tasks();
$this->taskStorage = $tasks->getTaskStorage();
// This system uses DummyTaskStorage because Redis can't serialize the Mock Handlers. Doesn't matter much anyway.
$this->taskStorage = new TaskStorage\DummyTaskStorage([]);
$this->shellWorker = new ShellWorker($this->taskStorage);
$this->taskStorage->reset();
// Clear events
Events::$listeners = [];
$this->shellWorker = $tasks->getWorker();
}
public function testClass()
@ -75,4 +75,165 @@ class ShellWorkerTest extends TestCase
// @todo Add lots of tests and amend ShellWorker to return results
/**
* @depends testClass
*/
public function testUseHandler()
{
// First prepare a Mock Handler
$mockHandler = $this->createMock(Handler::class);
$mockHandler->expects($this->exactly(2))->method('getParentHandler')->willReturn(null);
$mockHandler->expects($this->once())
->method('primaryHandler')
->with($this->callback(function($subject){return $subject instanceof Task;}))
->willReturn(true);
$mockHandler->expects($this->once())
->method('postHandler')
->with($this->callback(function($subject){return $subject instanceof Task;}))
->willReturn(true);
$mockHandler->expects($this->once())->method('getOutput')->willReturn('Some Output!');
$mockHandler->expects($this->once())->method('getPostOutput')->willReturn('Post Output!');
// Create a Dummy task
$dummyTask = new Task('testUseHandler', $mockHandler);
$this->taskStorage->addTask($dummyTask);
// Run the task in ShellWorker
$this->shellWorker->run($dummyTask, false);
// And verify if the Output is correctly set
$output = $this->taskStorage->readTaskOutput($dummyTask);
$this->assertEquals('Some Output!', $output['output']);
$this->assertEquals(Task::SUCCESS, $output['statusCode']);
// And run the post handler
$this->shellWorker->run($dummyTask, true);
$output = $this->taskStorage->readPostOutput($dummyTask);
$this->assertEquals('Post Output!', $output['output']);
$this->assertEquals(Task::SUCCESS, $output['statusCode']);
}
/**
* @depends testUseHandler
*/
public function testFailingHandlers()
{
$mockHandler = $this->createMock(Handler::class);
$mockHandler->expects($this->once())
->method('primaryHandler')
->with($this->callback(function($subject){return $subject instanceof Task;}))
->willReturn(false);
$mockHandler->expects($this->once())
->method('postHandler')
->with($this->callback(function($subject){return $subject instanceof Task;}))
->willReturn(false);
// Create a Dummy task
$dummyTask = new Task('testFailingHandlers1', $mockHandler);
$this->taskStorage->addTask($dummyTask);
// Run the task in ShellWorker
$this->shellWorker->run($dummyTask, false);
// And verify if the Output is correctly set
$output = $this->taskStorage->readTaskOutput($dummyTask);
$this->assertEquals('', $output['output']);
$this->assertEquals(Task::FAILED, $output['statusCode']);
// And run a post failure
$this->shellWorker->run($dummyTask, true);
$output = $this->taskStorage->readPostOutput($dummyTask);
$this->assertEquals('', $output['output']);
$this->assertEquals(Task::FAILED, $output['statusCode']);
}
/**
* @depends testUseHandler
*/
public function testParentHandlers()
{
// First create the Handlers
$parentHandler = $this->createMock(Handler::class);
$childHandler = $this->createMock(Handler::class);
// Prepare parent handler output
$parentHandler->expects($this->once())
->method('primaryHandler')
->with($this->callback(function($subject){return $subject instanceof Task;}))
->willReturn(true);
$parentHandler->expects($this->once())
->method('getOutput')
->willReturn('Parent Output');
// Prepare the child handler
$childHandler->expects($this->once())
->method('getParentHandler')
->willReturn($parentHandler);
$childHandler->expects($this->once())
->method('setParentInput')
->with($this->equalTo('Parent Output'));
$childHandler->expects($this->once())
->method('primaryHandler')
->with($this->callback(function($subject){return $subject instanceof Task;}))
->willReturn(true);
$childHandler->expects($this->once())
->method('getOutput')
->willReturn('Child Output');
// Set the relation
$childHandler->setParentHandler($parentHandler);
// Create the Dummy Task
$dummyTask = new Task('testParentHandlers', $childHandler);
$this->taskStorage->addTask($dummyTask);
// Run the task in ShellWorker
$this->shellWorker->run($dummyTask, false);
// And verify if the Output is correctly set
$output = $this->taskStorage->readTaskOutput($dummyTask);
$this->assertEquals('Child Output', $output['output']);
$this->assertEquals(Task::SUCCESS, $output['statusCode']);
}
/**
* @depends testParentHandlers
*/
public function testCascadingParentFailure()
{
// First create the Handlers
$parentHandler = $this->createMock(Handler::class);
$childHandler = $this->createMock(Handler::class);
// Set the relation
$childHandler->setParentHandler($parentHandler);
$childHandler->expects($this->once())
->method('getParentHandler')
->willReturn($parentHandler);
// Set the results
$parentHandler->expects($this->once())
->method('primaryHandler')
->willReturn(false);
$childHandler->expects($this->never())
->method('primaryHandler')
->willReturn(true);
// And some methods which shall be called in the end
$childHandler->expects($this->once())
->method('getOutput')
->willReturn('Task failed successfully');
// Create the task to run this
$dummyTask = new Task('testCascadingParentFailure', $childHandler);
$this->taskStorage->addTask($dummyTask);
// Run the task in ShellWorker
$this->shellWorker->run($dummyTask, false);
// And verify whether the task has indeed failed
$output = $this->taskStorage->readTaskOutput($dummyTask);
$this->assertEquals('Task failed successfully', $output['output']);
$this->assertEquals(Task::FAILED, $output['statusCode']);
}
}

View File

@ -83,4 +83,26 @@ class ArgumentedHandler implements Handler
{
return $this->output;
}
/**
* @inheritDoc
*/
public function getParentHandler(): ?Handler
{
return null;
}
/**
* @inheritDoc
*/
public function setParentInput($input): void
{
}
/**
* @inheritDoc
*/
public function setParentHandler(Handler $parentHandler): void
{
}
}

View File

@ -69,4 +69,26 @@ class EmptyHandler implements Handler
public function getPostOutput()
{
}
/**
* @inheritDoc
*/
public function getParentHandler(): ?Handler
{
return null;
}
/**
* @inheritDoc
*/
public function setParentInput($input): void
{
}
/**
* @inheritDoc
*/
public function setParentHandler(Handler $parentHandler): void
{
}
}

View File

@ -73,4 +73,26 @@ class TestStartAndReadTasksHandler implements Handler
{
}
/**
* @inheritDoc
*/
public function getParentHandler(): ?Handler
{
return null;
}
/**
* @inheritDoc
*/
public function setParentInput($input): void
{
}
/**
* @inheritDoc
*/
public function setParentHandler(Handler $parentHandler): void
{
}
}

View File

@ -71,4 +71,26 @@ class TestStopTaskHandler implements Handler
public function getPostOutput()
{
}
/**
* @inheritDoc
*/
public function getParentHandler(): ?Handler
{
return null;
}
/**
* @inheritDoc
*/
public function setParentInput($input): void
{
}
/**
* @inheritDoc
*/
public function setParentHandler(Handler $parentHandler): void
{
}
}

View File

@ -113,7 +113,7 @@ class ParallelSuperVisorTest extends TestCase
$this->superVisor->cycle();
// Pause 1/10th of a second
usleep(100000);
usleep(500000);
// Then re-fetch the Task
$dummyTask = $this->taskStorage->getTaskById($dummyTask->getId());
@ -145,7 +145,7 @@ class ParallelSuperVisorTest extends TestCase
$this->superVisor->cycle();
// Pause 1/10th of a second
usleep(100000);
usleep(500000);
// Then re-fetch the Task
$dummyTask = $this->taskStorage->getTaskById($dummyTask->getId());
@ -178,7 +178,7 @@ class ParallelSuperVisorTest extends TestCase
$this->superVisor->cycle();
// Pause 1/10th of a second
usleep(100000);
usleep(500000);
// Then re-fetch the Task
$dummyTask = $this->taskStorage->getTaskById($dummyTask->getId());
@ -211,7 +211,7 @@ class ParallelSuperVisorTest extends TestCase
$this->superVisor->cycle();
// Pause 1/10th of a second
usleep(100000);
usleep(500000);
// Then re-fetch the Task
$dummyTask = $this->taskStorage->getTaskById($dummyTask->getId());
@ -244,7 +244,7 @@ class ParallelSuperVisorTest extends TestCase
$this->superVisor->cycle();
// Pause 1/10th of a second
usleep(100000);
usleep(500000);
// Then re-fetch the Task
$dummyTask = $this->taskStorage->getTaskById($dummyTask->getId());
@ -277,7 +277,7 @@ class ParallelSuperVisorTest extends TestCase
$this->superVisor->cycle();
// Pause 1/10th of a second
usleep(100000);
usleep(500000);
// Then re-fetch the Task
$dummyTask = $this->taskStorage->getTaskById($dummyTask->getId());
@ -310,7 +310,7 @@ class ParallelSuperVisorTest extends TestCase
$this->superVisor->cycle();
// Pause 1/10th of a second
usleep(100000);
usleep(500000);
// Then re-fetch the Task
$dummyTask = $this->taskStorage->getTaskById($dummyTask->getId());
@ -354,7 +354,7 @@ class ParallelSuperVisorTest extends TestCase
$this->superVisor->cycle();
// Pause 1/10th of a second
usleep(100000);
usleep(500000);
// Reload all tasks from TaskStorage
$dummyTaskFailedYes = $this->taskStorage->getTaskById($dummyTaskFailedYes->getId());
@ -401,7 +401,7 @@ class ParallelSuperVisorTest extends TestCase
$this->superVisor->cycle();
// Pause 1/10th of a second
usleep(100000);
usleep(500000);
// And check if the Task has been cancelled
$dummyTask = $this->taskStorage->getTaskById($dummyTask->getId());
@ -429,7 +429,7 @@ class ParallelSuperVisorTest extends TestCase
$this->superVisor->cycle();
// Pause 1/10th of a second
usleep(100000);
usleep(500000);
// And check if the Task has been moved to Post
$dummyTask = $this->taskStorage->getTaskById($dummyTask->getId());
@ -460,7 +460,7 @@ class ParallelSuperVisorTest extends TestCase
$this->superVisor->cycle();
// Pause 1/10th of a second
usleep(100000);
usleep(500000);
// And check if the Tasks have been completed or moved to post
$dummyTaskPostNo = $this->taskStorage->getTaskById($dummyTaskPostNo->getId());
@ -491,7 +491,7 @@ class ParallelSuperVisorTest extends TestCase
$this->superVisor->cycle();
// Pause 1/10th of a second
usleep(100000);
usleep(500000);
// And check if the Tasks have been completed or failed
$dummyTaskFinished = $this->taskStorage->getTaskById($dummyTaskFinished->getId());