/usr/share/php/GuzzleHttp/Stream/AsyncReadStream.php is in php-guzzle-stream 3.0.0-5.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 | <?php
namespace GuzzleHttp\Stream;
/**
* Represents an asynchronous read-only stream that supports a drain event and
* pumping data from a source stream.
*
* The AsyncReadStream can be used as a completely asynchronous stream, meaning
* the data you can read from the stream will immediately return only
* the data that is currently buffered.
*
* AsyncReadStream can also be used in a "blocking" manner if a "pump" function
* is provided. When a caller requests more bytes than are available in the
* buffer, then the pump function is used to block until the requested number
* of bytes are available or the remote source stream has errored, closed, or
* timed-out. This behavior isn't strictly "blocking" because the pump function
* can send other transfers while waiting on the desired buffer size to be
* ready for reading (e.g., continue to tick an event loop).
*
* @unstable This class is subject to change.
*/
class AsyncReadStream implements StreamInterface
{
use StreamDecoratorTrait;
/** @var callable|null Fn used to notify writers the buffer has drained */
private $drain;
/** @var callable|null Fn used to block for more data */
private $pump;
/** @var int|null Highwater mark of the underlying buffer */
private $hwm;
/** @var bool Whether or not drain needs to be called at some point */
private $needsDrain;
/** @var int The expected size of the remote source */
private $size;
/**
* In order to utilize high water marks to tell writers to slow down, the
* provided stream must answer to the "hwm" stream metadata variable,
* providing the high water mark. If no "hwm" metadata value is available,
* then the "drain" functionality is not utilized.
*
* This class accepts an associative array of configuration options.
*
* - drain: (callable) Function to invoke when the stream has drained,
* meaning the buffer is now writable again because the size of the
* buffer is at an acceptable level (e.g., below the high water mark).
* The function accepts a single argument, the buffer stream object that
* has drained.
* - pump: (callable) A function that accepts the number of bytes to read
* from the source stream. This function will block until all of the data
* that was requested has been read, EOF of the source stream, or the
* source stream is closed.
* - size: (int) The expected size in bytes of the data that will be read
* (if known up-front).
*
* @param StreamInterface $buffer Buffer that contains the data that has
* been read by the event loop.
* @param array $config Associative array of options.
*
* @throws \InvalidArgumentException if the buffer is not readable and
* writable.
*/
public function __construct(
StreamInterface $buffer,
array $config = []
) {
if (!$buffer->isReadable() || !$buffer->isWritable()) {
throw new \InvalidArgumentException(
'Buffer must be readable and writable'
);
}
if (isset($config['size'])) {
$this->size = $config['size'];
}
static $callables = ['pump', 'drain'];
foreach ($callables as $check) {
if (isset($config[$check])) {
if (!is_callable($config[$check])) {
throw new \InvalidArgumentException(
$check . ' must be callable'
);
}
$this->{$check} = $config[$check];
}
}
$this->hwm = $buffer->getMetadata('hwm');
// Cannot drain when there's no high water mark.
if ($this->hwm === null) {
$this->drain = null;
}
$this->stream = $buffer;
}
/**
* Factory method used to create new async stream and an underlying buffer
* if no buffer is provided.
*
* This function accepts the same options as AsyncReadStream::__construct,
* but added the following key value pairs:
*
* - buffer: (StreamInterface) Buffer used to buffer data. If none is
* provided, a default buffer is created.
* - hwm: (int) High water mark to use if a buffer is created on your
* behalf.
* - max_buffer: (int) If provided, wraps the utilized buffer in a
* DroppingStream decorator to ensure that buffer does not exceed a given
* length. When exceeded, the stream will begin dropping data. Set the
* max_buffer to 0, to use a NullStream which does not store data.
* - write: (callable) A function that is invoked when data is written
* to the underlying buffer. The function accepts the buffer as the first
* argument, and the data being written as the second. The function MUST
* return the number of bytes that were written or false to let writers
* know to slow down.
* - drain: (callable) See constructor documentation.
* - pump: (callable) See constructor documentation.
*
* @param array $options Associative array of options.
*
* @return array Returns an array containing the buffer used to buffer
* data, followed by the ready to use AsyncReadStream object.
*/
public static function create(array $options = [])
{
$maxBuffer = isset($options['max_buffer'])
? $options['max_buffer']
: null;
if ($maxBuffer === 0) {
$buffer = new NullStream();
} elseif (isset($options['buffer'])) {
$buffer = $options['buffer'];
} else {
$hwm = isset($options['hwm']) ? $options['hwm'] : 16384;
$buffer = new BufferStream($hwm);
}
if ($maxBuffer > 0) {
$buffer = new DroppingStream($buffer, $options['max_buffer']);
}
// Call the on_write callback if an on_write function was provided.
if (isset($options['write'])) {
$onWrite = $options['write'];
$buffer = FnStream::decorate($buffer, [
'write' => function ($string) use ($buffer, $onWrite) {
$result = $buffer->write($string);
$onWrite($buffer, $string);
return $result;
}
]);
}
return [$buffer, new self($buffer, $options)];
}
public function getSize()
{
return $this->size;
}
public function isWritable()
{
return false;
}
public function write($string)
{
return false;
}
public function read($length)
{
if (!$this->needsDrain && $this->drain) {
$this->needsDrain = $this->stream->getSize() >= $this->hwm;
}
$result = $this->stream->read($length);
// If we need to drain, then drain when the buffer is empty.
if ($this->needsDrain && $this->stream->getSize() === 0) {
$this->needsDrain = false;
$drainFn = $this->drain;
$drainFn($this->stream);
}
$resultLen = strlen($result);
// If a pump was provided, the buffer is still open, and not enough
// data was given, then block until the data is provided.
if ($this->pump && $resultLen < $length) {
$pumpFn = $this->pump;
$result .= $pumpFn($length - $resultLen);
}
return $result;
}
}
|