/usr/share/php/Predis/PubSub/AbstractPubSubContext.php is in libphp-predis 0.8.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 | <?php
/*
* This file is part of the Predis package.
*
* (c) Daniele Alessandri <suppakilla@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Predis\PubSub;
use Predis\ClientException;
use Predis\ClientInterface;
use Predis\NotSupportedException;
/**
* Client-side abstraction of a Publish / Subscribe context.
*
* @author Daniele Alessandri <suppakilla@gmail.com>
*/
abstract class AbstractPubSubContext implements \Iterator
{
const SUBSCRIBE = 'subscribe';
const UNSUBSCRIBE = 'unsubscribe';
const PSUBSCRIBE = 'psubscribe';
const PUNSUBSCRIBE = 'punsubscribe';
const MESSAGE = 'message';
const PMESSAGE = 'pmessage';
const STATUS_VALID = 1; // 0b0001
const STATUS_SUBSCRIBED = 2; // 0b0010
const STATUS_PSUBSCRIBED = 4; // 0b0100
private $position = null;
private $statusFlags = self::STATUS_VALID;
/**
* Automatically closes the context when PHP's garbage collector kicks in.
*/
public function __destruct()
{
$this->closeContext(true);
}
/**
* Checks if the specified flag is valid in the state of the context.
*
* @param int $value Flag.
* @return Boolean
*/
protected function isFlagSet($value)
{
return ($this->statusFlags & $value) === $value;
}
/**
* Subscribes to the specified channels.
*
* @param mixed $arg,... One or more channel names.
*/
public function subscribe(/* arguments */)
{
$this->writeCommand(self::SUBSCRIBE, func_get_args());
$this->statusFlags |= self::STATUS_SUBSCRIBED;
}
/**
* Unsubscribes from the specified channels.
*
* @param mixed $arg,... One or more channel names.
*/
public function unsubscribe(/* arguments */)
{
$this->writeCommand(self::UNSUBSCRIBE, func_get_args());
}
/**
* Subscribes to the specified channels using a pattern.
*
* @param mixed $arg,... One or more channel name patterns.
*/
public function psubscribe(/* arguments */)
{
$this->writeCommand(self::PSUBSCRIBE, func_get_args());
$this->statusFlags |= self::STATUS_PSUBSCRIBED;
}
/**
* Unsubscribes from the specified channels using a pattern.
*
* @param mixed $arg,... One or more channel name patterns.
*/
public function punsubscribe(/* arguments */)
{
$this->writeCommand(self::PUNSUBSCRIBE, func_get_args());
}
/**
* Closes the context by unsubscribing from all the subscribed channels.
* Optionally, the context can be forcefully closed by dropping the
* underlying connection.
*
* @param Boolean $force Forcefully close the context by closing the connection.
* @return Boolean Returns false if there are no pending messages.
*/
public function closeContext($force = false)
{
if (!$this->valid()) {
return false;
}
if ($force) {
$this->invalidate();
$this->disconnect();
} else {
if ($this->isFlagSet(self::STATUS_SUBSCRIBED)) {
$this->unsubscribe();
}
if ($this->isFlagSet(self::STATUS_PSUBSCRIBED)) {
$this->punsubscribe();
}
}
return !$force;
}
/**
* Closes the underlying connection on forced disconnection.
*/
protected abstract function disconnect();
/**
* Writes a Redis command on the underlying connection.
*
* @param string $method ID of the command.
* @param array $arguments List of arguments.
*/
protected abstract function writeCommand($method, $arguments);
/**
* {@inheritdoc}
*/
public function rewind()
{
// NOOP
}
/**
* Returns the last message payload retrieved from the server and generated
* by one of the active subscriptions.
*
* @return array
*/
public function current()
{
return $this->getValue();
}
/**
* {@inheritdoc}
*/
public function key()
{
return $this->position;
}
/**
* {@inheritdoc}
*/
public function next()
{
if ($this->valid()) {
$this->position++;
}
return $this->position;
}
/**
* Checks if the the context is still in a valid state to continue.
*
* @return Boolean
*/
public function valid()
{
$isValid = $this->isFlagSet(self::STATUS_VALID);
$subscriptionFlags = self::STATUS_SUBSCRIBED | self::STATUS_PSUBSCRIBED;
$hasSubscriptions = ($this->statusFlags & $subscriptionFlags) > 0;
return $isValid && $hasSubscriptions;
}
/**
* Resets the state of the context.
*/
protected function invalidate()
{
$this->statusFlags = 0; // 0b0000;
}
/**
* Waits for a new message from the server generated by one of the active
* subscriptions and returns it when available.
*
* @return array
*/
protected abstract function getValue();
}
|