60 lines
1.2 KiB
PHP
60 lines
1.2 KiB
PHP
<?php
|
|
|
|
namespace React\Stream;
|
|
|
|
use React\Promise\Deferred;
|
|
use React\Promise\PromisorInterface;
|
|
|
|
class BufferedSink extends WritableStream implements PromisorInterface
|
|
{
|
|
private $buffer = '';
|
|
private $deferred;
|
|
|
|
public function __construct()
|
|
{
|
|
$this->deferred = new Deferred();
|
|
|
|
$this->on('pipe', array($this, 'handlePipeEvent'));
|
|
$this->on('error', array($this, 'handleErrorEvent'));
|
|
}
|
|
|
|
public function handlePipeEvent($source)
|
|
{
|
|
Util::forwardEvents($source, $this, array('error'));
|
|
}
|
|
|
|
public function handleErrorEvent($e)
|
|
{
|
|
$this->deferred->reject($e);
|
|
}
|
|
|
|
public function write($data)
|
|
{
|
|
$this->buffer .= $data;
|
|
$this->deferred->progress($data);
|
|
}
|
|
|
|
public function close()
|
|
{
|
|
if ($this->closed) {
|
|
return;
|
|
}
|
|
|
|
parent::close();
|
|
$this->deferred->resolve($this->buffer);
|
|
}
|
|
|
|
public function promise()
|
|
{
|
|
return $this->deferred->promise();
|
|
}
|
|
|
|
public static function createPromise(ReadableStreamInterface $stream)
|
|
{
|
|
$sink = new static();
|
|
$stream->pipe($sink);
|
|
|
|
return $sink->promise();
|
|
}
|
|
}
|