conn = new Redis(); // Afterwards connect to server $socketType = $parameters['socket_type']; if ($socketType == 'unix') $success = $this->conn->connect($parameters['socket']); elseif ($socketType == 'tcp') $success = $this->conn->connect($parameters['host'], $parameters['port'], $parameters['timeout']); else $success = false; // If unsuccessful, return false if (!$success) throw new TasksException("Could not construct RedisTaskStorage. Failed to connect."); // Otherwise attempt authentication, if needed if (isset($parameters['password']) && !$this->conn->auth($parameters['password'])) throw new TasksException("Could not construct RedisTaskStorage. Authentication failure."); } catch (RedisException $e) { throw new TasksException("Could not construct RedisTaskStorage. RedisException thrown: '" . $e->getMessage() . "'"); } } /** * @inheritDoc * @throws TasksException */ public function addTask(Task $task): bool { // Check if the task doesn't exist yet $taskId = $task->getId(); // Query the index $isMember = $this->conn->sIsMember($this->indexSet, $taskId); if ($isMember) throw new TasksException("Could not add Task to TaskStorage. Task '$taskId' already exists."); // Serialize the task and save it $taskData = serialize($task); $this->conn->set($this->key_prefix . $taskId, $taskData); $this->conn->sAdd($this->indexSet, $taskId); return true; } /** * @inheritDoc */ public function readTasks(): array { return $this->refreshTasks(); } /** * @inheritDoc */ public function refreshTasks() { // First fetch an array of all tasks in the set $taskList = $this->conn->sMembers($this->indexSet); // Go over each taskId and fetch the specific task $tasks = []; foreach ($taskList as $taskId) $tasks[] = unserialize($this->conn->get($this->key_prefix . $taskId)); return $tasks; } /** * @inheritDoc */ public function getTaskById(string $identifier): Task { // Query the index $isMember = $this->conn->sIsMember($this->indexSet, $identifier); if (!$isMember) throw new TasksException("Could not get task by ID. Task not found."); // Fetch the task /** @var Task $task */ $task = unserialize($this->conn->get($this->key_prefix . $identifier)); // Return the task return $task; } /** * @inheritDoc */ public function modifyTask(Task $task): bool { // First get the task ID $taskId = $task->getId(); // Check if it exists $isMember = $this->conn->sIsMember($this->indexSet, $taskId); if (!$isMember) throw new TasksException("Could not modify task. Task '$taskId' already exists."); // And write the data $taskData = serialize($task); return $this->conn->set($this->key_prefix . $taskId, $taskData); } /** * @inheritDoc * @throws TasksException */ public function deleteTask(Task $task): bool { // First get the task ID $taskId = $task->getId(); // Check if it exists $isMember = $this->conn->sIsMember($this->indexSet, $taskId); if (!$isMember) throw new TasksException("Could not modify task. Task '$taskId' already exists."); // Delete the key $this->conn->del($this->key_prefix . $taskId); $this->conn->sRem($this->indexSet, $taskId); // Remove all task output and post output $settings = $task->getRetrySettings(); $maxRetries = $settings['maxRetries']; for ($i=0;$i<=$maxRetries;$i++) { // First remove all possible task output if ($this->conn->exists($this->key_prefix . $taskId . '_output_' . $i)) $this->conn->del($this->key_prefix . $taskId . '_output_' . $i); if ($this->conn->exists($this->key_prefix . $taskId . '_post_' . $i)) $this->conn->del($this->key_prefix . $taskId . '_post_' . $i); } return true; } /** * @inheritDoc */ public function writeTaskOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool { // First get the task ID $taskId = $task->getId(); // Check if the key already exists if ($this->conn->exists($this->key_prefix . $taskId . '_output_' . $attempt)) throw new TasksException("Could not write task output. Output already written."); // Prepare contents $contents = ['taskId' => $task->getId(), 'output' => $output, 'errors' => $errors, 'statusCode' => $statusCode]; $data = serialize($contents); // Write contents return $this->conn->set($this->key_prefix . $taskId . '_output_' . $attempt, $data); } /** * @inheritDoc */ public function writePostOutput(Task $task, string $output, string $errors, int $statusCode, int $attempt = 0): bool { // First get the task ID $taskId = $task->getId(); // Check if the key already exists if ($this->conn->exists($this->key_prefix . $taskId . '_post_' . $attempt)) throw new TasksException("Could not write post output. Output already written."); // Prepare contents $contents = ['taskId' => $task->getId(), 'output' => $output, 'errors' => $errors, 'statusCode' => $statusCode]; $data = serialize($contents); // Write contents return $this->conn->set($this->key_prefix . $taskId . '_post_' . $attempt, $data); } /** * @inheritDoc */ public function readTaskOutput(Task $task, int $attempt = 0): ?array { // First get the task ID $taskId = $task->getId(); // Check if the key already exists if (!$this->conn->exists($this->key_prefix . $taskId . '_output_' . $attempt)) return null; // Load and convert the data $data = $this->conn->get($this->key_prefix . $taskId . '_output_' . $attempt); $data = unserialize($data); return ['output' => $data['output'], 'errors' => $data['errors'], 'statusCode' => $data['statusCode']]; } /** * @inheritDoc */ public function readPostOutput(Task $task, int $attempt = 0): ?array { // First get the task ID $taskId = $task->getId(); // Check if the key already exists if (!$this->conn->exists($this->key_prefix . $taskId . '_post_' . $attempt)) return null; // Load and convert the data $data = $this->conn->get($this->key_prefix . $taskId . '_post_' . $attempt); $data = unserialize($data); return ['output' => $data['output'], 'errors' => $data['errors'], 'statusCode' => $data['statusCode']]; } }