Built foundation.

System is now fully in a working order, though many parts must still be developed.
System still requires the following components:
- Dependency Constraint. Only run when all dependencies have ran.
- TaskScheduler. Standardized task that plans task periodically and maintains consistency.
- Better TaskStorage.
- Proper documentation
This commit is contained in:
Abel Hoogeveen 2020-01-29 21:24:26 +01:00
parent bcbb7535b1
commit 9c08114008
No known key found for this signature in database
GPG Key ID: 96C2234920BF4292
13 changed files with 614 additions and 202 deletions

7
Dockerfile Normal file
View File

@ -0,0 +1,7 @@
FROM php:7.3-cli-buster
RUN apt-get update &&\
apt-get install --no-install-recommends --assume-yes --quiet procps ca-certificates curl git &&\
rm -rf /var/lib/apt/lists/*
# PDO
RUN docker-php-ext-install pdo_mysql

View File

@ -17,6 +17,9 @@
"fuzeworks/core": "~1.2.0",
"ext-json": "*"
},
"require-dev": {
"fuzeworks/tracycomponent": "~1.2.0"
},
"autoload": {
"psr-4": {
"FuzeWorks\\Async\\": "src/FuzeWorks/Async"

View File

@ -0,0 +1,96 @@
<?php
/**
* FuzeWorks Async Library
*
* The FuzeWorks PHP FrameWork
*
* Copyright (C) 2013-2020 TechFuze
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
* @author TechFuze
* @copyright Copyright (c) 2013 - 2020, TechFuze. (http://techfuze.net)
* @license https://opensource.org/licenses/MIT MIT License
*
* @link http://techfuze.net/fuzeworks
* @since Version 1.0.0
*
* @version Version 1.0.0
*/
namespace FuzeWorks\Async\Constraint;
use FuzeWorks\Async\Constraint;
use FuzeWorks\Async\Task;
/**
* Class FixedTimeConstraint
*
* Use this constraint to prevent the execution of a task before a certain time.
* This constraint is particularly useful for planning and spreading the execution of tasks.
*
* @package FuzeWorks\Async\Constraint
*/
class FixedTimeConstraint implements Constraint
{
/**
* @var int
*/
public $timestamp;
/**
* FixedTimeConstraint constructor.
*
* Provide a unix timestamp for when this task should be run.
* The SuperVisor will start the task if the time has exceeded the provided timestamp
*
* @param int $unitTimeStamp Unix time to start this task
*/
public function __construct(int $unitTimeStamp)
{
$this->timestamp = $unitTimeStamp;
}
/**
* @inheritDoc
*/
public function intervene(Task $task): bool
{
if ($task->getStatus() === Task::PENDING && time() < $this->timestamp)
return true;
return false;
}
/**
* @inheritDoc
*/
public function blockCode(): int
{
return Task::DELAYED;
}
/**
* @inheritDoc
*/
public function delayTime(): int
{
return $this->timestamp;
}
}

View File

@ -36,6 +36,7 @@
namespace FuzeWorks\Async\Executors;
use FuzeWorks\Async\Executor;
use FuzeWorks\Async\Process;
use FuzeWorks\Async\Task;
use FuzeWorks\Async\TasksException;
@ -64,7 +65,6 @@ class ShellExecutor implements Executor
$this->worker = $workerFile;
}
private function shellExec($format, array $parameters = [])
{
$parameters = array_map("escapeshellarg", $parameters);
@ -81,18 +81,8 @@ class ShellExecutor implements Executor
// Then execute the command using the base64_encoded string of the taskID
$output = $this->shellExec($commandString, [base64_encode($task->getId())]);
// Add the PID to the task and change its state to 'RUNNING'
if ($post)
{
$task->setStatus(Task::POST);
$task->addAttribute('post_pid', $output[0]);
}
else
{
$task->setStatus(Task::RUNNING);
$task->addAttribute('run_pid', $output[0]);
}
$pid = intval($output[0]);
$task->setProcess(new Process($pid));
// And finally return the task
return $task;
@ -114,28 +104,38 @@ class ShellExecutor implements Executor
// First prepare the command used to gather info on processes
$commandString = "ps -o pid,%%cpu,%%mem,state,start -p %s | sed 1d";
// Then fetch the PID
try {
$pid = is_null($task->attribute('post_pid')) ? $task->attribute('run_pid') : $task->attribute('post_pid');
$output = $this->shellExec($commandString, [$pid]);
if (count($output) < 1)
return null;
$last = $output[count($output) - 1];
if (trim($last) === "")
return null;
$parts = preg_split("/\s+/", trim($last));
$pid = intval($parts[0]);
$state = strtoupper(trim($parts[3]));
if ("{$pid}" !== $parts[0] || $state === 'Z')
return null;
return $parts;
} catch (TasksException $e) {
// First we must determine what process is used.
$process = $task->getProcess();
if (is_null($process))
return null;
}
// Then using that process we determine the ProcessID
$pid = $process->getPid();
// And we execute the commandString to fetch info on the process
$output = $this->shellExec($commandString, [$pid]);
// If not output is provided, the command failed and should return null
if (count($output) < 1)
return null;
// ??
$last = $output[count($output) - 1];
if (trim($last) === "")
return null;
// Split up the info
$parts = preg_split("/\s+/", trim($last));
// Determine the state of the process
// If the process is in a 'zombie' state, it should be considered fully executed.
// Cleanup of Zombie processes must take place by periodically restarting the SuperVisor, or by using a SuperVisor which does not have the zombie problem
$state = strtoupper(trim($parts[3]));
if ("{$pid}" !== $parts[0] || $state === 'Z')
return null;
// Finally, return the Task information
return $parts;
}
public function getTaskExitCode(Task $task): int

View File

@ -63,9 +63,8 @@ interface Handler
*
* Remember: postHandler is called in a new process. Data is not shared between processes unless done manually.
* @param Task $task
* @return bool
*/
public function postHandler(Task $task): bool;
public function postHandler(Task $task);
/**
* Any output generated by postHandler should be returned here

View File

@ -33,31 +33,41 @@
*
* @version Version 1.0.0
*/
use FuzeWorks\Async\Tasks;
use FuzeWorks\Logger;
require_once('vendor/autoload.php');
namespace FuzeWorks\Async;
class Process
{
// Open configurator
$configurator = new FuzeWorks\Configurator();
const PENDING = 1;
const RUNNING = 2;
const FAILED = 3;
const FINISHED = 4;
// Set up basic settings
$configurator->setTimeZone('Europe/Amsterdam');
$configurator->setTempDirectory(dirname(__FILE__) . DIRECTORY_SEPARATOR . 'temp');
$configurator->setLogDirectory(dirname(__FILE__). DIRECTORY_SEPARATOR . 'log');
/**
* The current status of the process
*
* @var int
*/
protected $status = Process::PENDING;
// Add Async library
$configurator->deferComponentClassMethod('libraries', 'addLibraryClass', null, 'async', '\FuzeWorks\Async\Tasks');
/**
* @var int
*/
protected $pid;
// Debug
$configurator->enableDebugMode()->setDebugAddress('ALL');
public function __construct(int $pid)
{
$this->pid = $pid;
}
// Create container
$container = $configurator->createContainer();
/**
* Receive the process Id of this process
*
* @return int
*/
public function getPid(): int
{
return $this->pid;
}
// Add lib
Logger::enableScreenLog();
// RUN THE APP
/** @var Tasks $lib */
$lib = $container->libraries->get('async');
}

View File

@ -77,115 +77,179 @@ class ParallelSuperVisor implements SuperVisor
if (empty($this->tasks))
return SuperVisor::FINISHED;
// Check if all tasks are completed
$allCompleted = true;
foreach ($this->tasks as $task)
if ($task->getStatus() !== Task::COMPLETED)
$allCompleted = false;
if ($allCompleted)
return SuperVisor::FINISHED;
for ($i=0;$i<count($this->tasks);$i++)
{
$task = $this->tasks[$i];
dump(date('H:i:s') . ': ' . $task->getId() .'/'.$i . ": state #" . $task->getStatus());
if ($task->getStatus() !== Task::COMPLETED && $task->getStatus() !== Task::CANCELLED)
dump(date('H:i:s') . ': ' . $task->getId() .'/'.$i . ": state " . Task::getStatusType($task->getStatus()));
// DELAYED Tasks. If a task is delayed, but the time has passed, mark the task as pending
if ($task->getStatus() === Task::DELAYED && time() < $task->getDelayTime())
$task->setStatus(Task::PENDING);
// FAILED Tasks, check if failed can be made pending using constraints
elseif ($task->getStatus() === Task::FAILED)
// PENDING: should start if not constrained
if ($task->getStatus() === Task::PENDING)
{
// Test if constrained
$task = $this->testConstraints($task);
// FINISHED/CANCELLED/FAILED Tasks, check if they are old enough for removal
elseif ($task->getStatus() === Task::COMPLETED || $task->getStatus() === Task::CANCELLED || $task->getStatus() === Task::FAILED)
{
}
// If the task is pending, start if not constrained
elseif ($task->getStatus() === Task::PENDING)
{
// First test for constraints.
$task = $this->testConstraints($task);
// If the task is still PENDING, execute it
// If the task changed status, task is no longer pending and should be processed by another statement
if ($task->getStatus() !== Task::PENDING)
continue;
// START THE PROCESS
// Start the process using the executor service
$task = $this->executor->startTask($task);
$task->setStatus(Task::RUNNING);
// Modify the task in TaskStorage
$this->taskStorage->modifyTask($task);
}
// Check if tasks are still running
// DELAYED: If task is delayed, and enough time has passed, change the status back to pending
elseif ($task->getStatus() === Task::DELAYED && time() > $task->getDelayTime())
{
$task->setStatus(Task::PENDING);
$this->taskStorage->modifyTask($task);
}
// CANCELLED/COMPLETED: remove the task if requested to do so
elseif ($task->getStatus() === Task::COMPLETED || $task->getStatus() === Task::CANCELLED)
{
}
// RUNNING: check if task is still running. If not, set result based on output
elseif ($task->getStatus() === Task::RUNNING)
{
$isRunning = $this->executor->getTaskRunning($task);
$output = $this->taskStorage->readTaskOutput($task);
$output = $this->taskStorage->readTaskOutput($task, $task->getRetries());
$hasOutput = !is_null($output);
// If nothing is found, the process has crashed and status PFAILED should be set
if (!$isRunning && !$hasOutput)
$task->setStatus(Task::FAILED);
$task->setStatus(Task::PFAILED);
// @todo Set PFAILED after $max_Time
// If output is found, use the status code from that
elseif (!$isRunning && $hasOutput)
{
try {
$task->setOutput($output['output'], $output['errors']);
$task->setStatus($output['statusCode']);
} catch (TasksException $e) {
// Ignore
// On failure to set output, consider as a process failure
$task->setStatus(Task::PFAILED);
}
}
// elseif (EXPIRED)
// In any other situation the process is still running and should be left alone
else
continue;
// If any changes have been made, they should be written to TaskStorage
$this->taskStorage->modifyTask($task);
}
// FAILED: if a process has failed, attempt to rety if requested to do so
elseif ($task->getStatus() === Task::PFAILED || $task->getStatus() === Task::FAILED)
{
// First fetch retry conditions
$settings = $task->getRetrySettings();
// First test if any retries should be tried at all
if ($settings['retryOnFail'] === true && $task->getRetries() < $settings['maxRetries'])
{
// Then test if this type of failure should be retried and whether the mexRetries has been exceeded
if (
($task->getStatus() === Task::PFAILED && $settings['retryPFailures'] === true) ||
($task->getStatus() === Task::FAILED && $settings['retryRFailures'] === true)
)
{
// If eligible, reset task to pending
$task->addRetry();
$task = $this->executor->startTask($task);
$task->setStatus(Task::RUNNING);
$this->taskStorage->modifyTask($task);
continue;
}
}
// If the task is not eligible for a retry, either cancel it or move it to a postHandler
if ($task->getUsePostHandler() === true)
{
$task->resetRetries();
$task = $this->executor->startTask($task, true);
$task->setStatus(Task::POST);
}
else
$task->setStatus(Task::CANCELLED);
$this->taskStorage->modifyTask($task);
}
// Check if task succeeded, and whether the task must progress to POST or COMPLETED
// @todo Add failed handler
// SUCCESS: if a task has succeeded, see if it needs a postHandler
elseif ($task->getStatus() === Task::SUCCESS)
{
// Calling post handler when requested to do so
if ($task->getCallPostHandler() === true || ($task->getCallPostHandlerWhenFailed() === true && $output['statusCode'] === Task::FAILED))
if ($task->getUsePostHandler() === true)
{
$task->resetRetries();
$task = $this->executor->startTask($task, true);
$task->setStatus(Task::POST);
}
else
$task->setStatus(Task::COMPLETED);
$this->taskStorage->modifyTask($task);
}
// If the task is in post mode, check when that process has ended
// POST: when a task is currently running in it's postHandler
elseif ($task->getStatus() === Task::POST)
{
$isRunning = $this->executor->getTaskRunning($task);
$output = $this->taskStorage->readTaskOutput($task);
$hasOutput = isset($output['postStatus']);
$output = $this->taskStorage->readPostOutput($task, $task->getRetries());
$hasOutput = !is_null($output);
// If a task is not running and has no output, an error has occurred
if (!$isRunning && !$hasOutput)
$task->setStatus(Task::FAILED);
{
// Test if a retry should be attempted
$settings = $task->getRetrySettings();
if ($settings['retryOnFail'] === true && $settings['retryPostFailures'] === true && $settings['maxRetries'] > $task->getRetries())
{
$task->addRetry();
$task = $this->executor->startTask($task, true);
}
elseif ($settings['maxRetries'] <= $task->getRetries())
$task->setStatus(Task::CANCELLED);
}
// @todo Retry after $max_Time
// If a task is not running and has output, set that output and mark as completed
elseif (!$isRunning && $hasOutput)
{
try {
$task->setPostOutput($output['postOutput'], $output['postErrors']);
$task->setStatus(Task::COMPLETED);
} catch (TasksException $e) {
// Ignore
}
$task->setPostOutput($output['output'], $output['errors']);
$task->setStatus(Task::COMPLETED);
}
// If the task is still running, leave it be
else
continue;
// If any changes have been made, they should be written to TaskStorage
$this->taskStorage->modifyTask($task);
}
}
return SuperVisor::RUNNING;
// Check if all tasks are completed
$allCompleted = true;
$anyDelayed = false;
foreach ($this->tasks as $task)
{
if ($task->getStatus() !== Task::COMPLETED && $task->getStatus() !== Task::CANCELLED)
$allCompleted = false;
elseif ($task->getStatus() === Task::DELAYED)
$anyDelayed = true;
}
// If all are finished and none are delayed
if ($allCompleted && !$anyDelayed)
return SuperVisor::FINISHED;
if ($allCompleted && $anyDelayed)
return SuperVisor::CONSTRAINED;
else
return SuperVisor::RUNNING;
}
private function testConstraints(Task $task): Task
@ -197,7 +261,7 @@ class ParallelSuperVisor implements SuperVisor
{
$task->setStatus($constraint->blockCode());
if ($constraint->blockCode() === Task::DELAYED)
$task->setDelayTime(time() + $constraint->delayTime());
$task->setDelayTime($constraint->delayTime());
// Save changes to TaskStorage
$this->taskStorage->modifyTask($task);

View File

@ -38,14 +38,77 @@ namespace FuzeWorks\Async;
class Task
{
/**
* If a task is waiting to be executed, its status will be PENDING
*/
const PENDING = 1;
/**
* If a task is currently in process, its status will be RUNNING
*/
const RUNNING = 2;
/**
* If a task does not reach the goal it sets out to, its status should be set to FAILED.
*
* Not to be confused with Process::FAILED. Process::FAILED will be the Process status when a Process running a Task unexpectedly stops running.
* This will set the Task to Task::PFAILED
*/
const FAILED = 3;
const SUCCESS = 4;
const POST = 5;
const COMPLETED = 6;
const DELAYED = 7;
const CANCELLED = 8;
const PFAILED = 4;
/**
* If a task has reached its goal, its status should be set to SUCCESS
*/
const SUCCESS = 5;
/**
* If a task is in process of its postHandler, its status will be POST
*/
const POST = 6;
/**
* If a task has completed all its goals, its status will be COMPLETED
*/
const COMPLETED = 7;
/**
* If a task has a timed based constraint, its status will be DELAYED
*
* Task is delayed by the time in $delayTime
*/
const DELAYED = 8;
/**
* If a task has a constraint preventing it from running at any point, its status will be CANCELLED
*/
const CANCELLED = 9;
public static function getStatusType($statusId)
{
switch ($statusId) {
case 1:
return 'Task::PENDING';
case 2:
return 'Task::RUNNING';
case 3:
return 'Task::FAILED';
case 4:
return 'Task::PFAILED';
case 5:
return 'Task::SUCCESS';
case 6:
return 'Task::POST';
case 7:
return 'Task::COMPLETED';
case 8:
return 'Task::DELAYED';
case 9:
return 'Task::CANCELLED';
default:
return false;
}
}
/**
* @var string
@ -57,6 +120,11 @@ class Task
*/
protected $handlerClass;
/**
* @var bool
*/
protected $usePostHandler = false;
/**
* @var array
*/
@ -100,21 +168,21 @@ class Task
*
* @var array
*/
protected $data = [];
protected $attributes = [];
/**
* Whether the post handler should be called after every task execution
*
* @var bool
* @var Process
*/
protected $callPostHandler = false;
protected $process;
/**
* Whether the post handler should be called after execution has failed
*
* @var bool
*/
protected $callPostHandlerWhenFailed = false;
/* -------- Some settings ------------ */
protected $retryOnFail = false;
protected $maxRetries = 2;
protected $retryPFailures = true;
protected $retryRFailures = true;
protected $retryPostFailures = true;
protected $retries = 0;
/**
* Task constructor.
@ -123,13 +191,15 @@ class Task
*
* @param string $identifier The unique identifier of this task. Make sure it is always unique!
* @param string $handlerClass The class that shall handle this task
* @param bool $usePostHandler Whether the postHandler on handlerClass should also be used
* @param mixed $parameters,... The arguments provided to the method that shall handle this class
* @throws TasksException
*/
public function __construct(string $identifier, string $handlerClass)
public function __construct(string $identifier, string $handlerClass, bool $usePostHandler = false)
{
$this->taskId = $identifier;
$this->handlerClass = $handlerClass;
$this->usePostHandler = $usePostHandler;
if (func_num_args() > 3)
$args = array_slice(func_get_args(), 3);
else
@ -162,6 +232,16 @@ class Task
return $this->handlerClass;
}
/**
* Whether the postHandler on the handlerClass should be invoked after processing the initial task.
*
* @return bool
*/
public function getUsePostHandler(): bool
{
return $this->usePostHandler;
}
/**
* Gets the arguments to be provided to the method of the class that shall process this task
*
@ -205,6 +285,8 @@ class Task
/**
* Sets the status of this Task.
*
* Must be one of the constants of this Task class
*
* @param int $status
*/
public function setStatus(int $status)
@ -240,10 +322,10 @@ class Task
*/
public function attribute(string $key)
{
if (!isset($this->data[$key]))
if (!isset($this->attributes[$key]))
return null;
return $this->data[$key];
return $this->attributes[$key];
}
/**
@ -258,7 +340,7 @@ class Task
if (!$this->isSerializable($value))
throw new TasksException("Could not set Task '$this->taskId' attribute '$key'. Value not serializable.");
$this->data[$key] = $value;
$this->attributes[$key] = $value;
}
/**
@ -276,31 +358,6 @@ class Task
return $this->postOutput;
}
/**
* Sets the conditions for when the Handler::postHandler should be called.
*
* If you want the postHandler to only get called on errors, set onEvery to false and onFail to true.
*
* @param bool $onEvery Call the post handler on every result
* @param bool $onFail Call the post handler only when failed
*/
public function setPostHandler(bool $onEvery = false, bool $onFail = false)
{
$this->callPostHandler = $onEvery;
$this->callPostHandlerWhenFailed = $onFail;
}
public function getCallPostHandler(): bool
{
return $this->callPostHandler;
}
public function getCallPostHandlerWhenFailed(): bool
{
return $this->callPostHandlerWhenFailed;
}
/**
* Return the errors of this task execution
*
@ -317,24 +374,121 @@ class Task
}
/**
* @todo Handle output from multiple attempts
* @param string $output
* @param string $errors
*/
public function setOutput(string $output, string $errors)
{
if (!is_null($this->output) || !is_null($this->errors))
throw new TasksException("Could not set output. Output already set.");
$this->output = $output;
$this->errors = $errors;
}
/**
* @todo Handle output from multiple attempts
* @param string $output
* @param string $errors
*/
public function setPostOutput(string $output, string $errors)
{
if (!is_null($this->postOutput) || !is_null($this->postErrors))
throw new TasksException("Could not set post output. Output already set.");
$this->postOutput = $output;
$this->postErrors = $errors;
}
/**
* Sets the initial process for this task.
*
* To be set by Executor
*
* @param Process $process
*/
public function setProcess(Process $process)
{
$this->process = $process;
}
/**
* Returns the initial process for this task
*
* @return Process|null
*/
public function getProcess(): ?Process
{
return $this->process;
}
public function removeProcess(): bool
{
if ($this->process instanceof Process)
{
$this->process = null;
return true;
}
return false;
}
/**
* Set whether this task should retry after a failure, and how many times
*
* @param bool $retryOnFail
* @param int $maxRetries
* @param bool $retryRegularFailures
* @param bool $retryProcessFailures
* @param bool $retryPostFailures
*/
public function setRetrySettings(bool $retryOnFail, int $maxRetries = 2, bool $retryRegularFailures = true, bool $retryProcessFailures = true, bool $retryPostFailures = true)
{
$this->retryOnFail = $retryOnFail;
$this->maxRetries = $maxRetries;
$this->retryPFailures = $retryProcessFailures;
$this->retryRFailures = $retryRegularFailures;
$this->retryPostFailures = $retryPostFailures;
}
/**
* Returns the failure retry settings
*
* @return array
*/
public function getRetrySettings(): array
{
return [
'retryOnFail' => $this->retryOnFail,
'maxRetries' => $this->maxRetries,
'retryPFailures' => $this->retryPFailures,
'retryRFailures' => $this->retryRFailures,
'retryPostFailures' => $this->retryPostFailures
];
}
/**
* Add a retry to the retry counter
*/
public function addRetry()
{
$this->retries++;
}
/**
* Reset the retry counter back to 0
*/
public function resetRetries()
{
$this->retries = 0;
}
/**
* Receive the amount of retries already attempted
*
* @return int
*/
public function getRetries(): int
{
return $this->retries;
}
/**
* Checks whether an object can be serialized
*

View File

@ -88,34 +88,60 @@ interface TaskStorage
/**
* Write the task output into TaskStorage.
*
* @param Task $task
* @param string $output
* @param string $errors
* @param int $statusCode
* @return bool
*/
public function writeTaskOutput(Task $task, string $output, string $errors, int $statusCode): bool;
/**
* Write the output of the postHandler into TaskStorage
* $attempt refers to $task->getRetries(). If 0, it is the initial attempt. If > 0, it seeks a retry output.
*
* @param Task $task
* @param string $output
* @param string $errors
* @param int $statusCode
* @param int $attempt
* @return bool
* @throws TasksException
*/
public function writePostOutput(Task $task, string $output, string $errors, int $statusCode): bool;
public function writeTaskOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool;
/**
* Write the output of the postHandler into TaskStorage
*
* $attempt refers to $task->getRetries(). If 0, it is the initial attempt. If > 0, it seeks a retry output.
*
* @param Task $task
* @param string $output
* @param string $errors
* @param int $statusCode
* @param int $attempt
* @return bool
* @throws TasksException
*/
public function writePostOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool;
/**
* Read the task output from taskStorage.
*
* The following output is expected:
* array('output' => string $output, 'errors' => string $errors, 'status' => $code)
* OR null of not found (yet)
* OR null if not found (yet)
*
* $attempt refers to $task->getRetries(). If 0, it is the initial attempt. If > 0, it seeks a retry output.
*
* @param Task $task
* @param int $attempt
* @return array
*/
public function readTaskOutput(Task $task): ?array;
public function readTaskOutput(Task $task, int $attempt = 0): ?array;
/**
* Read the output from the postHandler
*
* The following output is expected:
* array('output' => string $output, 'errors' => string $errors, 'status' => $code)
* OR null if not found (yet)
*
* $attempt refers to $task->getRetries(). If 0, it is the initial attempt. If > 0, it seeks a retry output.
*
* @param Task $task
* @param int $attempt
* @return array|null
*/
public function readPostOutput(Task $task, int $attempt = 0): ?array;
}

View File

@ -184,11 +184,11 @@ class ArrayTaskStorage implements TaskStorage
* @inheritDoc
* @throws TasksException
*/
public function writeTaskOutput(Task $task, string $output, string $errors, int $statusCode): bool
public function writeTaskOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool
{
// Get the directory of the main storage
$dir = dirname($this->fileName);
$file = $dir . DS . 'task_' . md5($task->getId()) . '_output.json';
$file = $dir . DS . 'task_' . md5($task->getId()) . '_' . $attempt . '_output.json';
$contents = json_encode(['taskId' => $task->getId(), 'output' => $output, 'errors' => $errors, 'statusCode' => $statusCode]);
// If file already exists, panic
@ -205,11 +205,11 @@ class ArrayTaskStorage implements TaskStorage
* @inheritDoc
* @throws TasksException
*/
public function writePostOutput(Task $task, string $output, string $errors, int $statusCode): bool
public function writePostOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool
{
// Get the directory of the main storage
$dir = dirname($this->fileName);
$file = $dir . DS . 'task_' . md5($task->getId()) . '_post_output.json';
$file = $dir . DS . 'task_' . md5($task->getId()) . '_' . $attempt . '_post_output.json';
$contents = json_encode(['taskId' => $task->getId(), 'output' => $output, 'errors' => $errors, 'statusCode' => $statusCode]);
// If file already exists, panic
@ -225,12 +225,12 @@ class ArrayTaskStorage implements TaskStorage
/**
* @inheritDoc
*/
public function readTaskOutput(Task $task): ?array
public function readTaskOutput(Task $task, int $attempt = 0): ?array
{
// Get the directory of the main storage
$dir = dirname($this->fileName);
$file = $dir . DS . 'task_' . md5($task->getId()) . '_output.json';
$file2 = $dir . DS . 'task_' . md5($task->getId()) . '_post_output.json';
$file = $dir . DS . 'task_' . md5($task->getId()) . '_' . $attempt . '_output.json';
// If file doesn't exist, return so
if (!file_exists($file))
@ -239,20 +239,27 @@ class ArrayTaskStorage implements TaskStorage
// Decode the contents
$contents = file_get_contents($file);
$data = json_decode($contents, true);
$output = ['output' => $data['output'], 'errors' => $data['errors'], 'statusCode' => $data['statusCode']];
// Add post data if necessary
if (file_exists($file2))
{
$postContents = file_get_contents($file2);
$postData = json_decode($postContents, true);
$output['postOutput'] = $postData['output'];
$output['postErrors'] = $postData['errors'];
$output['postStatus'] = $postData['statusCode'];
}
return ['output' => $data['output'], 'errors' => $data['errors'], 'statusCode' => $data['statusCode']];
}
// And return them
return $output;
/**
* @inheritDoc
*/
public function readPostOutput(Task $task, int $attempt = 0): ?array
{
// Get the directory of the main storage
$dir = dirname($this->fileName);
$file = $dir . DS . 'task_' . md5($task->getId()) . '_' . $attempt . '_post_output.json';
// If file doesn't exist, return so
if (!file_exists($file))
return null;
// Decode the contents
$contents = file_get_contents($file);
$data = json_decode($contents, true);
return ['output' => $data['output'], 'errors' => $data['errors'], 'statusCode' => $data['statusCode']];
}
private function commit()

View File

@ -50,6 +50,16 @@ class Worker
*/
protected $taskStorage;
/**
* @var Task
*/
protected $task;
/**
* @var bool
*/
protected $post;
public function __construct(TaskStorage $taskStorage)
{
$this->taskStorage = $taskStorage;
@ -72,45 +82,65 @@ class Worker
$event = Events::fireEvent(new TaskHandleEvent(), $task);
$task = $event->getTask();
// Set task to this worker
$this->task = $task;
$this->post = $post;
// Fetch the callable
$class = $task->getHandlerClass();
$class = $this->task->getHandlerClass();
if (!class_exists($class, true))
{
$errors = 'Could not run task. HandlerClass \'' . $class . '\' not found.';
if (!$post)
$this->taskStorage->writeTaskOutput($this->task, '', $errors, Task::PFAILED, $this->task->getRetries());
else
$this->taskStorage->writePostOutput($this->task, '', $errors, Task::PFAILED, $this->task->getRetries());
throw new TasksException("Could not run task. '$class' not found.");
}
// Create the handler
/** @var Handler $object */
$object = new $class();
if (!$object instanceof Handler)
{
$errors = "Could not run task. '$class' is not instance of Handler.";
if (!$post)
$this->taskStorage->writeTaskOutput($this->task, '', $errors, Task::PFAILED, $this->task->getRetries());
else
$this->taskStorage->writePostOutput($this->task, '', $errors, Task::PFAILED, $this->task->getRetries());
throw new TasksException("Could not run task. '$class' is not instance of Handler.");
}
// Run postHandler if post mode is requested
if ($post)
{
$postSuccess = $object->postHandler($task);
$postSuccess = $object->postHandler($this->task);
$postOutput = $object->getPostOutput();
$postOutput = is_null($postOutput) ? '' : (string) $postOutput;
$postErrors = $this->getErrors();
if (!$postSuccess)
$this->taskStorage->writePostOutput($task, $postOutput, $postErrors, Task::FAILED);
$this->taskStorage->writePostOutput($this->task, $postOutput, $postErrors, Task::FAILED, $this->task->getRetries());
else
$this->taskStorage->writePostOutput($task, $postOutput, $postErrors, Task::SUCCESS);
$this->taskStorage->writePostOutput($this->task, $postOutput, $postErrors, Task::SUCCESS, $this->task->getRetries());
$this->output($postOutput, $postErrors);
return;
}
// And execute
$success = $object->primaryHandler($task);
// Run primaryHandler if requested
$success = $object->primaryHandler($this->task);
$output = $object->getOutput();
$output = is_null($output) ? '' : (string) $output;
$errors = $this->getErrors();
// And afterwards write the results to the TaskStorage
if (!$success)
$this->taskStorage->writeTaskOutput($task, $output, $errors, Task::FAILED);
$this->taskStorage->writeTaskOutput($this->task, $output, $errors, Task::FAILED, $this->task->getRetries());
else
$this->taskStorage->writeTaskOutput($task, $output, $errors, Task::SUCCESS);
$this->taskStorage->writeTaskOutput($this->task, $output, $errors, Task::SUCCESS, $this->task->getRetries());
$this->output($output, $errors);
}
@ -128,6 +158,17 @@ class Worker
// Collect all error logs
$errors = $this->getErrors();
$this->output('', $errors);
try {
// Write to TaskStorage
if (!$this->post)
$this->taskStorage->writeTaskOutput($this->task, '', $errors, Task::FAILED, $this->task->getRetries());
else
$this->taskStorage->writePostOutput($this->task, '', $errors, Task::FAILED, $this->task->getRetries());
} catch (TasksException $e) {
// Ignore
}
exit;
}
@ -145,8 +186,7 @@ class Worker
continue;
$output[] = strtoupper($log['type']) . ' ' .
(!empty($log['logFile']) && !empty($log['logLine']) ? $log['logFile'] . ':' . $log['logLine'] . " '" : '') .
$log['message'];
(!empty($log['logFile']) && !empty($log['logLine']) ? $log['logFile'] . ':' . $log['logLine'] . " '" : "'") . $log['message'] . "'";
}
return implode("\n", $output);

View File

@ -48,6 +48,9 @@ $configurator->setTimeZone('Europe/Amsterdam');
$configurator->setTempDirectory(dirname(__FILE__) . DIRECTORY_SEPARATOR . 'temp');
$configurator->setLogDirectory(dirname(__FILE__). DIRECTORY_SEPARATOR . 'log');
// Add dev components
$configurator->addComponent(new \FuzeWorks\TracyComponent());
// Add Async library
$configurator->deferComponentClassMethod('libraries', 'addLibraryClass', null, 'async', '\FuzeWorks\Async\Tasks');

View File

@ -45,6 +45,9 @@ $configurator->setTimeZone('Europe/Amsterdam');
$configurator->setTempDirectory(dirname(__FILE__) . DIRECTORY_SEPARATOR . 'temp');
$configurator->setLogDirectory(dirname(__FILE__). DIRECTORY_SEPARATOR . 'log');
// Add dev components
$configurator->addComponent(new \FuzeWorks\TracyComponent());
// Add Async library
$configurator->deferComponentClassMethod('libraries', 'addLibraryClass', null, 'async', '\FuzeWorks\Async\Tasks');