Compare commits

..

No commits in common. "finetuning" and "master" have entirely different histories.

12 changed files with 25 additions and 401 deletions

View File

@ -109,17 +109,12 @@ try {
// And finally, run the supervisor // And finally, run the supervisor
try { try {
$supervisor = $lib->getSuperVisor($bootstrap); $supervisor = $lib->getSuperVisor($bootstrap);
$res = SuperVisor::RUNNING; while ($supervisor->cycle() !== SuperVisor::RUNNING) {
while ($res === SuperVisor::RUNNING) {
$res = $supervisor->cycle();
usleep(250000); usleep(250000);
} }
// Write results // Write results
if ($res === SuperVisor::CONSTRAINED) fwrite(STDOUT, "SuperVisor finished scheduled tasks.");
fwrite(STDOUT, "SuperVisor finished due to constrained tasks." . PHP_EOL);
else
fwrite(STDOUT, "SuperVisor finished due to finishing all tasks." . PHP_EOL);
} catch (InvalidArgumentException | TasksException | LibraryException $e) { } catch (InvalidArgumentException | TasksException | LibraryException $e) {
fwrite(STDERR, sprintf('FuzeWorks Async could not load.' . PHP_EOL . fwrite(STDERR, sprintf('FuzeWorks Async could not load.' . PHP_EOL .
'Exception: ' . $e->getMessage() . PHP_EOL) 'Exception: ' . $e->getMessage() . PHP_EOL)

View File

@ -44,10 +44,9 @@ interface Constraint
* Return true ONLY when this constraint changes the execution of the Task. Otherwise return false. * Return true ONLY when this constraint changes the execution of the Task. Otherwise return false.
* *
* @param Task $task * @param Task $task
* @param Task[] $tasks
* @return bool * @return bool
*/ */
public function intervene(Task $task, array $tasks): bool; public function intervene(Task $task): bool;
/** /**
* When intervene() returns true, this method should return the new status of the task. * When intervene() returns true, this method should return the new status of the task.

View File

@ -84,12 +84,12 @@ class DependencyConstraint implements Constraint
/** /**
* @inheritDoc * @inheritDoc
*/ */
public function intervene(Task $task, array $tasks): bool public function intervene(Task $task): bool
{ {
// Fetch taskStorage // Fetch taskStorage
try { try {
$tasksLib = $this->loadTasksLib(); $tasks = $this->loadTasksLib();
$taskStorage = $tasksLib->getTaskStorage(); $taskStorage = $tasks->getTaskStorage();
// Is any dependency unresolved? // Is any dependency unresolved?
$hasUnresolved = false; $hasUnresolved = false;

View File

@ -70,7 +70,7 @@ class FixedTimeConstraint implements Constraint
/** /**
* @inheritDoc * @inheritDoc
*/ */
public function intervene(Task $task, array $tasks): bool public function intervene(Task $task): bool
{ {
if ($task->getStatus() === Task::PENDING && time() < $this->timestamp) if ($task->getStatus() === Task::PENDING && time() < $this->timestamp)
return true; return true;

View File

@ -1,124 +0,0 @@
<?php
/**
* FuzeWorks Component.
*
* The FuzeWorks PHP FrameWork
*
* Copyright (C) 2013-2019 TechFuze
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
* @author TechFuze
* @copyright Copyright (c) 2013 - 2019, TechFuze. (http://techfuze.net)
* @license https://opensource.org/licenses/MIT MIT License
*
* @link http://techfuze.net/fuzeworks
* @since Version 1.2.0
*
* @version Version 1.2.0
*/
namespace FuzeWorks\Async\Constraint;
use FuzeWorks\Async\Constraint;
use FuzeWorks\Async\Task;
/**
* Class GroupConstraint
*
* Use this constraint to maximize the amount of concurrent tasks within a certain group.
* If the amount of running tasks equals the maximum concurrent tasks, this constraint will issue a delay.
* Useful when multiple tasks are not dependent on each other, but should be run separately.
*
* @package FuzeWorks\Async\Constraint
*/
class GroupConstraint implements Constraint
{
/**
* @var string
*/
protected $groupName;
/**
* @var int
*/
protected $maxConcurrent = 3;
protected $returnStatus = Task::DELAYED;
/**
* GroupConstraint constructor.
*
* To add a group, simply provide the $groupName and the maximum of concurrent tasks within this group.
*
* @param string $groupName
* @param int $maxConcurrent
*/
public function __construct(string $groupName, int $maxConcurrent = 3)
{
$this->groupName = $groupName;
$this->maxConcurrent = $maxConcurrent;
}
public function init(Task $task)
{
$task->addAttribute('groupName', $this->groupName);
}
public function getGroupName(): string
{
return $this->groupName;
}
public function intervene(Task $task, array $tasks): bool
{
// Find all tasks within this group and that are running
$runningTasks = 0;
foreach ($tasks as $t)
{
foreach ($t->getConstraints() as $constraint)
{
if ($constraint instanceof GroupConstraint && $this->groupName === $constraint->getGroupName())
{
if ($t->getStatus() === Task::RUNNING || $t->getStatus() === Task::POST)
$runningTasks++;
}
}
// If the running tasks exceeds the maximum, block this task from running
if ($runningTasks >= $this->maxConcurrent)
{
$this->returnStatus = Task::DELAYED;
return true;
}
}
return false;
}
public function blockCode(): int
{
return $this->returnStatus;
}
public function delayTime(): int
{
return time() + 1;
}
}

View File

@ -1,133 +0,0 @@
<?php
/**
* FuzeWorks Component.
*
* The FuzeWorks PHP FrameWork
*
* Copyright (C) 2013-2019 TechFuze
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
* @author TechFuze
* @copyright Copyright (c) 2013 - 2019, TechFuze. (http://techfuze.net)
* @license https://opensource.org/licenses/MIT MIT License
*
* @link http://techfuze.net/fuzeworks
* @since Version 1.2.0
*
* @version Version 1.2.0
*/
namespace FuzeWorks\Async\Constraint;
use FuzeWorks\Async\Constraint;
use FuzeWorks\Async\Task;
use FuzeWorks\Async\Tasks;
use FuzeWorks\Async\TasksException;
use FuzeWorks\Exception\FactoryException;
use FuzeWorks\Exception\LibraryException;
use FuzeWorks\Factory;
use FuzeWorks\Libraries;
class GroupDependencyConstraint implements Constraint
{
/**
* @var string
*/
protected $groupName;
protected $returnStatus = Task::DELAYED;
public function __construct(string $groupName)
{
$this->groupName = $groupName;
}
/**
* @inheritDoc
*/
public function intervene(Task $task, array $tasks): bool
{
// Find whether any task is not completed
foreach ($tasks as $t)
{
foreach ($t->getConstraints() as $constraint)
{
// Check whether the constraint is a GroupConstraint
if ($constraint instanceof GroupConstraint && $this->groupName === $constraint->getGroupName())
{
$this->returnStatus = Task::DELAYED;
return true;
}
}
}
// After all known tasks have run, check if all have ended without a CANCELLED status
$taskStorage = $this->loadTasksLib()->getTaskStorage();
$tasks = $taskStorage->getTasksByAttribute('groupName', $this->groupName);
foreach ($tasks as $t)
{
if ($t->getStatus() === Task::CANCELLED)
{
$this->returnStatus = Task::CANCELLED;
$task->setOutput('', 'Task cancelled due to failed group dependency.');
return true;
}
}
return false;
}
/**
* @inheritDoc
*/
public function blockCode(): int
{
return $this->returnStatus;
}
/**
* @inheritDoc
*/
public function delayTime(): int
{
return time() + 3;
}
/**
* Load the tasks library, so that dependencies can get scanned later
*
* @return Tasks
* @throws TasksException
*/
private function loadTasksLib(): Tasks
{
try {
/** @var Libraries $libraries */
$libraries = Factory::getInstance('libraries');
/** @var Tasks $tasks */
$tasks = $libraries->get('async');
return $tasks;
} catch (FactoryException | LibraryException $e) {
throw new TasksException("Could not constrain task. Async library could not be loaded.");
}
}
}

View File

@ -59,23 +59,13 @@ class ShellExecutor implements Executor
if (!isset($parameters['workerFile']) || !isset($parameters['bootstrapFile'])) if (!isset($parameters['workerFile']) || !isset($parameters['bootstrapFile']))
throw new TasksException("Could not construct ShellExecutor. Parameter failure."); throw new TasksException("Could not construct ShellExecutor. Parameter failure.");
// Determine alternative worker file
$alternative = dirname(__DIR__, 4) . DIRECTORY_SEPARATOR . 'bin' . DIRECTORY_SEPARATOR . 'worker';
// Fetch workerFile // Fetch workerFile
$workerFile = null; $this->worker = $parameters['workerFile'];
if (file_exists($parameters['workerFile'])) if (!file_exists($this->worker))
$workerFile = $parameters['workerFile'];
elseif (file_exists($alternative))
$workerFile = $alternative;
// If no workerFile is found, throw an exception
if (is_null($workerFile))
throw new TasksException("Could not construct ShellExecutor. ShellWorker script does not exist."); throw new TasksException("Could not construct ShellExecutor. ShellWorker script does not exist.");
// First determine the PHP binary // First determine the PHP binary
$this->binary = PHP_BINDIR . DS . 'php'; $this->binary = PHP_BINDIR . DS . 'php';
$this->worker = $workerFile;
$this->bootstrapFile = $parameters['bootstrapFile']; $this->bootstrapFile = $parameters['bootstrapFile'];
if (!file_exists($this->bootstrapFile)) if (!file_exists($this->bootstrapFile))
throw new TasksException("Could not construct ShellExecutor. No bootstrap file found."); throw new TasksException("Could not construct ShellExecutor. No bootstrap file found.");

View File

@ -149,9 +149,6 @@ class ControllerHandler implements Handler
*/ */
public function getOutput(): string public function getOutput(): string
{ {
if (!is_string($this->output))
return '';
return $this->output; return $this->output;
} }
@ -192,9 +189,6 @@ class ControllerHandler implements Handler
*/ */
public function getPostOutput(): string public function getPostOutput(): string
{ {
if (!is_string($this->postOutput))
return '';
return $this->postOutput; return $this->postOutput;
} }

View File

@ -62,7 +62,6 @@ class ParallelSuperVisor implements SuperVisor
{ {
$this->taskStorage = $taskStorage; $this->taskStorage = $taskStorage;
$this->executor = $executor; $this->executor = $executor;
gc_enable();
} }
/** /**
@ -89,10 +88,7 @@ class ParallelSuperVisor implements SuperVisor
// If the task changed status, task is no longer pending and should be processed by another statement // If the task changed status, task is no longer pending and should be processed by another statement
if ($task->getStatus() !== Task::PENDING) if ($task->getStatus() !== Task::PENDING)
{
fwrite(STDOUT, "Changed status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()) . PHP_EOL);
continue; continue;
}
// Start the process using the executor service // Start the process using the executor service
$task = $this->executor->startTask($task); $task = $this->executor->startTask($task);
@ -101,16 +97,15 @@ class ParallelSuperVisor implements SuperVisor
// Modify the task in TaskStorage // Modify the task in TaskStorage
$this->taskStorage->modifyTask($task); $this->taskStorage->modifyTask($task);
fwrite(STDOUT, "Changed status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()) . PHP_EOL); fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()));
} }
// DELAYED: If task is delayed, and enough time has passed, change the status back to pending // DELAYED: If task is delayed, and enough time has passed, change the status back to pending
// @todo Don't set to pending if still constrained
elseif ($task->getStatus() === Task::DELAYED && time() > $task->getDelayTime()) elseif ($task->getStatus() === Task::DELAYED && time() > $task->getDelayTime())
{ {
$task->setStatus(Task::PENDING); $task->setStatus(Task::PENDING);
$this->taskStorage->modifyTask($task); $this->taskStorage->modifyTask($task);
fwrite(STDOUT, "Changed status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()) . PHP_EOL); fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()));
} }
// RUNNING: check if task is still running. If not, set result based on output // RUNNING: check if task is still running. If not, set result based on output
@ -142,7 +137,7 @@ class ParallelSuperVisor implements SuperVisor
// If any changes have been made, they should be written to TaskStorage // If any changes have been made, they should be written to TaskStorage
$task->endTaskTime(); $task->endTaskTime();
$this->taskStorage->modifyTask($task); $this->taskStorage->modifyTask($task);
fwrite(STDOUT, "Changed status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()) . PHP_EOL); fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()));
} }
// FAILED: if a process has failed, attempt to rety if requested to do so // FAILED: if a process has failed, attempt to rety if requested to do so
@ -166,7 +161,7 @@ class ParallelSuperVisor implements SuperVisor
$task->setStatus(Task::RUNNING); $task->setStatus(Task::RUNNING);
$task->startTaskTime(); $task->startTaskTime();
$this->taskStorage->modifyTask($task); $this->taskStorage->modifyTask($task);
fwrite(STDOUT, "Changed status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()) . PHP_EOL); fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()));
continue; continue;
} }
} }
@ -183,7 +178,7 @@ class ParallelSuperVisor implements SuperVisor
$task->setStatus(Task::CANCELLED); $task->setStatus(Task::CANCELLED);
$this->taskStorage->modifyTask($task); $this->taskStorage->modifyTask($task);
fwrite(STDOUT, "Changed status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()) . PHP_EOL); fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()));
} }
// SUCCESS: if a task has succeeded, see if it needs a postHandler // SUCCESS: if a task has succeeded, see if it needs a postHandler
@ -200,7 +195,7 @@ class ParallelSuperVisor implements SuperVisor
$task->setStatus(Task::COMPLETED); $task->setStatus(Task::COMPLETED);
$this->taskStorage->modifyTask($task); $this->taskStorage->modifyTask($task);
fwrite(STDOUT, "Changed status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()) . PHP_EOL); fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()));
} }
// POST: when a task is currently running in it's postHandler // POST: when a task is currently running in it's postHandler
@ -242,43 +237,37 @@ class ParallelSuperVisor implements SuperVisor
// If any changes have been made, they should be written to TaskStorage // If any changes have been made, they should be written to TaskStorage
$task->endPostTime(); $task->endPostTime();
$this->taskStorage->modifyTask($task); $this->taskStorage->modifyTask($task);
fwrite(STDOUT, "Changed status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()) . PHP_EOL); fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()));
} }
} }
// Check if all tasks are completed // Check if all tasks are completed
$allCompleted = true;
$anyDelayed = false; $anyDelayed = false;
$anyOther = false;
foreach ($this->tasks as $task) foreach ($this->tasks as $task)
{ {
if ($task->getStatus() === Task::DELAYED) if ($task->getStatus() !== Task::COMPLETED && $task->getStatus() !== Task::CANCELLED)
$allCompleted = false;
elseif ($task->getStatus() === Task::DELAYED)
$anyDelayed = true; $anyDelayed = true;
elseif ($task->getStatus() !== Task::COMPLETED && $task->getStatus() !== Task::CANCELLED)
$anyOther = true;
} }
// If all are finished and none are delayed // If all are finished and none are delayed
if (!$anyOther && !$anyDelayed) if ($allCompleted && !$anyDelayed)
return SuperVisor::FINISHED; return SuperVisor::FINISHED;
elseif (!$anyOther && $anyDelayed) if ($allCompleted && $anyDelayed)
return SuperVisor::CONSTRAINED; return SuperVisor::CONSTRAINED;
else else
return SuperVisor::RUNNING; return SuperVisor::RUNNING;
} }
/**
* @todo Here it saves the constraint code regardless of whether there is a change. Fix!
*
* @param Task $task
* @return Task
* @throws TasksException
*/
private function testConstraints(Task $task): Task private function testConstraints(Task $task): Task
{ {
$constraints = $task->getConstraints(); $constraints = $task->getConstraints();
foreach ($constraints as $constraint) foreach ($constraints as $constraint)
{ {
if ($constraint->intervene($task, $this->tasks) && $constraint->blockCode() != 0) if ($constraint->intervene($task) && $constraint->blockCode() != 0)
{ {
$task->setStatus($constraint->blockCode()); $task->setStatus($constraint->blockCode());
if ($constraint->blockCode() === Task::DELAYED) if ($constraint->blockCode() === Task::DELAYED)
@ -286,6 +275,7 @@ class ParallelSuperVisor implements SuperVisor
// Save changes to TaskStorage // Save changes to TaskStorage
$this->taskStorage->modifyTask($task); $this->taskStorage->modifyTask($task);
fwrite(STDOUT, "\nChanged status of task '".$task->getId()."' to status " . Task::getStatusType($task->getStatus()));
} }
} }

View File

@ -262,9 +262,6 @@ class Task
*/ */
public function addConstraint(Constraint $constraint) public function addConstraint(Constraint $constraint)
{ {
if (method_exists($constraint, 'init'))
$constraint->init($this);
$this->constraints[] = $constraint; $this->constraints[] = $constraint;
} }
@ -322,16 +319,6 @@ class Task
/* ---------------------------------- Attributes setters and getters ------------------ */ /* ---------------------------------- Attributes setters and getters ------------------ */
/**
* Retrieve every attribute of this Task
*
* @return array
*/
public function getAttributes(): array
{
return $this->attributes;
}
/** /**
* Fetch an attribute of this task * Fetch an attribute of this task
* *

View File

@ -74,15 +74,6 @@ interface TaskStorage
*/ */
public function getTaskById(string $identifier): Task; public function getTaskById(string $identifier): Task;
/**
* Retrieve a list of all tasks that have a certain attribute
*
* @param string $attributeKey
* @param string $attributeValue
* @return Task[]
*/
public function getTasksByAttribute(string $attributeKey, string $attributeValue): array;
/** /**
* Modifies a task * Modifies a task
* *

View File

@ -54,8 +54,6 @@ class RedisTaskStorage implements TaskStorage
protected $indexSet = 'async_index'; protected $indexSet = 'async_index';
protected $unfinishedSet = 'async_index_unfinished'; protected $unfinishedSet = 'async_index_unfinished';
protected $attributesSet = 'async_attributes';
protected $attributesSetPrefix = 'async_attributes_';
protected $key_prefix = 'async_task_'; protected $key_prefix = 'async_task_';
/** /**
@ -117,22 +115,6 @@ class RedisTaskStorage implements TaskStorage
if ($this->conn->hSet($this->key_prefix . $taskId, 'data', $taskData) === FALSE) if ($this->conn->hSet($this->key_prefix . $taskId, 'data', $taskData) === FALSE)
return false; return false;
// And create the attributes sets
foreach ($task->getAttributes() as $key => $val)
{
// If the attribute value is not a string it can't be used as a searchable attribute
if (!is_string($val))
continue;
// And finally add it to the set
$setKey = $this->attributesSetPrefix . $key . '_' . $val;
$this->conn->sAdd($setKey, $taskId);
// And add the attribute combo to the attributesSet
if (!$this->conn->sIsMember($this->attributesSet, $key . '_' . $val))
$this->conn->sAdd($this->attributesSet, $key . '_' . $val);
}
return true; return true;
} }
@ -173,22 +155,6 @@ class RedisTaskStorage implements TaskStorage
return $task; return $task;
} }
/**
* @inheritDoc
*/
public function getTasksByAttribute(string $attributeKey, string $attributeValue): array
{
// Fetch the taskList by the attribute
$setKey = $this->attributesSetPrefix . $attributeKey . '_' . $attributeValue;
$taskList = $this->conn->sMembers($setKey);
$tasks = [];
foreach ($taskList as $taskId)
$tasks[] = unserialize($this->conn->hGet($this->key_prefix . $taskId, 'data'));
return $tasks;
}
/** /**
* @inheritDoc * @inheritDoc
*/ */
@ -216,29 +182,6 @@ class RedisTaskStorage implements TaskStorage
if ($this->conn->hSet($this->key_prefix . $taskId, 'data', $taskData) === FALSE) if ($this->conn->hSet($this->key_prefix . $taskId, 'data', $taskData) === FALSE)
return false; return false;
// Delete all the attributes
$allLoggedAttributes = $this->conn->sMembers($this->attributesSet);
foreach ($allLoggedAttributes as $attribute)
{
if ($this->conn->sIsMember($this->attributesSetPrefix . $attribute, $taskId))
$this->conn->sRem($this->attributesSetPrefix . $attribute, $taskId);
}
// And write all the attributes
foreach ($task->getAttributes() as $key => $val)
{
// If the attribute value is not a string it can't be used as a searchable attribute
if (!is_string($val))
continue;
// And finally add it to the set
$setKey = $this->attributesSetPrefix . $key . '_' . $val;
$this->conn->sAdd($setKey, $taskId);
// And add the attribute combo to the attributesSet
$this->conn->sAdd($this->attributesSet, $key . '_' . $val);
}
// Modify the unfinished set // Modify the unfinished set
if ($this->conn->sIsMember($this->unfinishedSet, $taskId) && ($task->getStatus() === Task::COMPLETED || $task->getStatus() === Task::CANCELLED )) if ($this->conn->sIsMember($this->unfinishedSet, $taskId) && ($task->getStatus() === Task::COMPLETED || $task->getStatus() === Task::CANCELLED ))
$this->conn->sRem($this->unfinishedSet, $taskId); $this->conn->sRem($this->unfinishedSet, $taskId);
@ -269,14 +212,6 @@ class RedisTaskStorage implements TaskStorage
if ($this->conn->sIsMember($this->unfinishedSet, $taskId)) if ($this->conn->sIsMember($this->unfinishedSet, $taskId))
$this->conn->sRem($this->unfinishedSet, $taskId); $this->conn->sRem($this->unfinishedSet, $taskId);
// Delete all the attributes
$allLoggedAttributes = $this->conn->sMembers($this->attributesSet);
foreach ($allLoggedAttributes as $attribute)
{
if ($this->conn->sIsMember($this->attributesSetPrefix . $attribute, $taskId))
$this->conn->sRem($this->attributesSetPrefix . $attribute, $taskId);
}
// Delete the task itself // Delete the task itself
if ($this->conn->del($this->key_prefix . $taskId) > 0) if ($this->conn->del($this->key_prefix . $taskId) > 0)
return true; return true;