Release of RC1 #7
|
@ -70,8 +70,7 @@ class ParallelSuperVisor implements SuperVisor
|
|||
public function cycle(): int
|
||||
{
|
||||
// First: if there are no tasks, load them
|
||||
$this->taskStorage->refreshTasks();
|
||||
$this->tasks = $this->taskStorage->readTasks();
|
||||
$this->tasks = $this->taskStorage->readTasks(true);
|
||||
|
||||
// If there are still no tasks, nothing is queued, so this cycle can end.
|
||||
if (empty($this->tasks))
|
||||
|
|
|
@ -115,6 +115,16 @@ class Task
|
|||
*/
|
||||
protected $taskId;
|
||||
|
||||
/**
|
||||
* @var int
|
||||
*/
|
||||
protected $status = Task::PENDING;
|
||||
|
||||
/**
|
||||
* @var array
|
||||
*/
|
||||
protected $arguments;
|
||||
|
||||
/**
|
||||
* @var Handler
|
||||
*/
|
||||
|
@ -125,16 +135,6 @@ class Task
|
|||
*/
|
||||
protected $usePostHandler = false;
|
||||
|
||||
/**
|
||||
* @var array
|
||||
*/
|
||||
protected $arguments;
|
||||
|
||||
/**
|
||||
* @var int
|
||||
*/
|
||||
protected $status = Task::PENDING;
|
||||
|
||||
/**
|
||||
* @var Constraint[]
|
||||
*/
|
||||
|
@ -389,7 +389,6 @@ class Task
|
|||
return $this->postErrors;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param string $output
|
||||
* @param string $errors
|
||||
|
|
|
@ -58,14 +58,12 @@ interface TaskStorage
|
|||
/**
|
||||
* Retrieves a list of all tasks logged in the system
|
||||
*
|
||||
* When using $noIncludeDone, Tasks that are Completed and Cancelled will not be returned.
|
||||
*
|
||||
* @param bool $noIncludeDone
|
||||
* @return Task[]
|
||||
*/
|
||||
public function readTasks(): array;
|
||||
|
||||
/**
|
||||
* Reload the list of all tasks.
|
||||
*/
|
||||
public function refreshTasks();
|
||||
public function readTasks(bool $noIncludeDone = false): array;
|
||||
|
||||
/**
|
||||
* Retrieves an individual task by its identifier
|
||||
|
@ -107,7 +105,6 @@ interface TaskStorage
|
|||
* @param string $output
|
||||
* @param string $errors
|
||||
* @param int $statusCode
|
||||
* @param int $attempt
|
||||
* @return bool
|
||||
* @throws TasksException
|
||||
*/
|
||||
|
@ -122,7 +119,6 @@ interface TaskStorage
|
|||
* @param string $output
|
||||
* @param string $errors
|
||||
* @param int $statusCode
|
||||
* @param int $attempt
|
||||
* @return bool
|
||||
* @throws TasksException
|
||||
*/
|
||||
|
|
|
@ -100,16 +100,19 @@ class DummyTaskStorage implements TaskStorage
|
|||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function readTasks(): array
|
||||
public function readTasks(bool $noIncludeDone = false): array
|
||||
{
|
||||
return $this->tasks;
|
||||
}
|
||||
if ($noIncludeDone === false)
|
||||
return $this->tasks;
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function refreshTasks()
|
||||
{// Ignore
|
||||
$tasks = [];
|
||||
foreach ($this->tasks as $task)
|
||||
{
|
||||
if ($task->getStatus() !== Task::COMPLETED && $task->getStatus() !== Task::CANCELLED)
|
||||
$tasks[] = $task;
|
||||
}
|
||||
|
||||
return $tasks;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -53,6 +53,7 @@ class RedisTaskStorage implements TaskStorage
|
|||
protected $conn;
|
||||
|
||||
protected $indexSet = 'async_index';
|
||||
protected $unfinishedSet = 'async_index_unfinished';
|
||||
protected $key_prefix = 'async_task_';
|
||||
|
||||
/**
|
||||
|
@ -107,6 +108,8 @@ class RedisTaskStorage implements TaskStorage
|
|||
|
||||
// Register the task
|
||||
$this->conn->sAdd($this->indexSet, $taskId);
|
||||
if ($task->getStatus() !== Task::COMPLETED && $task->getStatus() !== Task::CANCELLED)
|
||||
$this->conn->sAdd($this->unfinishedSet, $taskId);
|
||||
|
||||
// And create a hash for it
|
||||
if ($this->conn->hSet($this->key_prefix . $taskId, 'data', $taskData) === FALSE)
|
||||
|
@ -118,18 +121,13 @@ class RedisTaskStorage implements TaskStorage
|
|||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function readTasks(): array
|
||||
{
|
||||
return $this->refreshTasks();
|
||||
}
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function refreshTasks()
|
||||
public function readTasks(bool $noIncludeDone = false): array
|
||||
{
|
||||
// First fetch an array of all tasks in the set
|
||||
$taskList = $this->conn->sMembers($this->indexSet);
|
||||
if ($noIncludeDone)
|
||||
$taskList = $this->conn->sMembers($this->unfinishedSet);
|
||||
else
|
||||
$taskList = $this->conn->sMembers($this->indexSet);
|
||||
|
||||
// Go over each taskId and fetch the specific task
|
||||
$tasks = [];
|
||||
|
@ -184,6 +182,12 @@ class RedisTaskStorage implements TaskStorage
|
|||
if ($this->conn->hSet($this->key_prefix . $taskId, 'data', $taskData) === FALSE)
|
||||
return false;
|
||||
|
||||
// Modify the unfinished set
|
||||
if ($this->conn->sIsMember($this->unfinishedSet, $taskId) && ($task->getStatus() === Task::COMPLETED || $task->getStatus() === Task::CANCELLED ))
|
||||
$this->conn->sRem($this->unfinishedSet, $taskId);
|
||||
elseif (!$this->conn->sIsMember($this->unfinishedSet, $taskId) && $task->getStatus() !== Task::COMPLETED && $task->getStatus() !== Task::CANCELLED)
|
||||
$this->conn->sAdd($this->unfinishedSet, $taskId);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -204,6 +208,10 @@ class RedisTaskStorage implements TaskStorage
|
|||
// Delete the task from the index
|
||||
$this->conn->sRem($this->indexSet, $taskId);
|
||||
|
||||
// And remove the task from the unfinishedSet
|
||||
if ($this->conn->sIsMember($this->unfinishedSet, $taskId))
|
||||
$this->conn->sRem($this->unfinishedSet, $taskId);
|
||||
|
||||
// Delete the task itself
|
||||
if ($this->conn->del($this->key_prefix . $taskId) > 0)
|
||||
return true;
|
||||
|
|
|
@ -39,7 +39,6 @@ use FuzeWorks\Async\Task;
|
|||
use FuzeWorks\Async\Tasks;
|
||||
use FuzeWorks\Async\TasksException;
|
||||
use FuzeWorks\Async\TaskStorage;
|
||||
use FuzeWorks\Async\TaskStorage\DummyTaskStorage;
|
||||
use FuzeWorks\Events;
|
||||
use FuzeWorks\Priority;
|
||||
use Mock\Handlers\EmptyHandler;
|
||||
|
@ -99,6 +98,32 @@ class TaskStorageTest extends TestCase
|
|||
$this->assertInstanceOf(EmptyHandler::class, $task->getHandler());
|
||||
}
|
||||
|
||||
/**
|
||||
* @depends testAddAndReadTasks
|
||||
*/
|
||||
public function testReadUnfinishedTasks()
|
||||
{
|
||||
// Add the tasks
|
||||
$finishedTask = new Task('finishedTask', new EmptyHandler());
|
||||
$finishedTask->setStatus(Task::COMPLETED);
|
||||
$unfinishedTask1 = new Task('unfinishedTask1', new EmptyHandler());
|
||||
$unfinishedTask2 = new Task('unfinishedTask2', new EmptyHandler());
|
||||
|
||||
// Nothing is written yet so it should be empty
|
||||
$this->assertEmpty($this->taskStorage->readTasks());
|
||||
|
||||
// Write the tasks to TaskStorage
|
||||
$this->taskStorage->addTask($finishedTask);
|
||||
$this->taskStorage->addTask($unfinishedTask1);
|
||||
$this->taskStorage->addTask($unfinishedTask2);
|
||||
|
||||
// And check whether they get properly read
|
||||
$this->assertCount(3, $this->taskStorage->readTasks());
|
||||
|
||||
// And whether the finished task gets omitted in the unfinished list
|
||||
$this->assertCount(2, $this->taskStorage->readTasks(true));
|
||||
}
|
||||
|
||||
/**
|
||||
* @depends testAddAndReadTasks
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue