/usr/share/php/Predis/PubSub/PubSubContext.php is in libphp-predis 0.8.3-1ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 | <?php
/*
* This file is part of the Predis package.
*
* (c) Daniele Alessandri <suppakilla@gmail.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Predis\PubSub;
use Predis\ClientException;
use Predis\ClientInterface;
use Predis\Command\AbstractCommand as Command;
use Predis\NotSupportedException;
use Predis\Connection\AggregatedConnectionInterface;
/**
* Client-side abstraction of a Publish / Subscribe context.
*
* @author Daniele Alessandri <suppakilla@gmail.com>
*/
class PubSubContext extends AbstractPubSubContext
{
private $client;
private $options;
/**
* @param ClientInterface Client instance used by the context.
* @param array Options for the context initialization.
*/
public function __construct(ClientInterface $client, Array $options = null)
{
$this->checkCapabilities($client);
$this->options = $options ?: array();
$this->client = $client;
$this->genericSubscribeInit('subscribe');
$this->genericSubscribeInit('psubscribe');
}
/**
* Checks if the passed client instance satisfies the required conditions
* needed to initialize a Publish / Subscribe context.
*
* @param ClientInterface Client instance used by the context.
*/
private function checkCapabilities(ClientInterface $client)
{
if ($client->getConnection() instanceof AggregatedConnectionInterface) {
throw new NotSupportedException('Cannot initialize a PUB/SUB context when using aggregated connections');
}
$commands = array('publish', 'subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe');
if ($client->getProfile()->supportsCommands($commands) === false) {
throw new NotSupportedException('The current profile does not support PUB/SUB related commands');
}
}
/**
* This method shares the logic to handle both SUBSCRIBE and PSUBSCRIBE.
*
* @param string $subscribeAction Type of subscription.
*/
private function genericSubscribeInit($subscribeAction)
{
if (isset($this->options[$subscribeAction])) {
$this->$subscribeAction($this->options[$subscribeAction]);
}
}
/**
* {@inheritdoc}
*/
protected function writeCommand($method, $arguments)
{
$arguments = Command::normalizeArguments($arguments);
$command = $this->client->createCommand($method, $arguments);
$this->client->getConnection()->writeCommand($command);
}
/**
* {@inheritdoc}
*/
protected function disconnect()
{
$this->client->disconnect();
}
/**
* {@inheritdoc}
*/
protected function getValue()
{
$response = $this->client->getConnection()->read();
switch ($response[0]) {
case self::SUBSCRIBE:
case self::UNSUBSCRIBE:
case self::PSUBSCRIBE:
case self::PUNSUBSCRIBE:
if ($response[2] === 0) {
$this->invalidate();
}
case self::MESSAGE:
return (object) array(
'kind' => $response[0],
'channel' => $response[1],
'payload' => $response[2],
);
case self::PMESSAGE:
return (object) array(
'kind' => $response[0],
'pattern' => $response[1],
'channel' => $response[2],
'payload' => $response[3],
);
default:
$message = "Received an unknown message type {$response[0]} inside of a pubsub context";
throw new ClientException($message);
}
}
}
|