added new socket manager, errorhandling and testing is not finished yet

This commit is contained in:
kremsy 2015-06-21 20:43:18 +02:00
parent eaf8819a57
commit fc5a3e04b6
32 changed files with 2627 additions and 0 deletions

View File

@ -0,0 +1,14 @@
<?php
/**
* Created by PhpStorm.
* User: Lukas
* Date: 21.06.2015
* Time: 19:48
*/
namespace ManiaControl\Sockets;
interface SocketListener {
}

View File

@ -0,0 +1,64 @@
<?php
/**
* Created by PhpStorm.
* User: Lukas
* Date: 20.06.2015
* Time: 22:44
*/
namespace ManiaControl\Sockets;
use ManiaControl\Callbacks\Listening;
use ManiaControl\ManiaControl;
use React\EventLoop\Factory;
class SocketHandler {
/** @var ManiaControl $maniaControl */
private $maniaControl = null;
/** @var Listening[] $socketListenings */
private $socketListenings = array();
/**
* Create a new Socket Handler Instance
*
* @param ManiaControl $maniaControl
*/
public function __construct(ManiaControl $maniaControl) {
$this->maniaControl = $maniaControl;
}
public function createSocket() {
$loop = Factory::create();
$server = stream_socket_server('tcp://127.0.0.1:19999');
stream_set_blocking($server, 0);
$loop->addReadStream($server, function ($server) use ($loop) {
$conn = stream_socket_accept($server);
$data = "HTTP/1.1 200 OK\r\nContent-Length: 3\r\n\r\nHi\n";
$loop->addWriteStream($conn, function ($conn) use (&$data, $loop) {
$written = fwrite($conn, $data);
if ($written === strlen($data)) {
fclose($conn);
$loop->removeStream($conn);
} else {
$data = substr($data, 0, $written);
}
});
});
$loop->addPeriodicTimer(5, function () {
$memory = memory_get_usage() / 1024;
$formatted = number_format($memory, 3) . 'K';
echo "Current memory usage: {$formatted}\n";
});
$loop->tick();
}
public function tick() {
}
}

View File

@ -0,0 +1,17 @@
<?php
/*
* This file is part of Evenement.
*
* (c) Igor Wiedler <igor@wiedler.ch>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Evenement;
class EventEmitter implements EventEmitterInterface
{
use EventEmitterTrait;
}

View File

@ -0,0 +1,22 @@
<?php
/*
* This file is part of Evenement.
*
* (c) Igor Wiedler <igor@wiedler.ch>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Evenement;
interface EventEmitterInterface
{
public function on($event, callable $listener);
public function once($event, callable $listener);
public function removeListener($event, callable $listener);
public function removeAllListeners($event = null);
public function listeners($event);
public function emit($event, array $arguments = []);
}

View File

@ -0,0 +1,68 @@
<?php
/*
* This file is part of Evenement.
*
* (c) Igor Wiedler <igor@wiedler.ch>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Evenement;
trait EventEmitterTrait
{
protected $listeners = [];
public function on($event, callable $listener)
{
if (!isset($this->listeners[$event])) {
$this->listeners[$event] = [];
}
$this->listeners[$event][] = $listener;
}
public function once($event, callable $listener)
{
$onceListener = function () use (&$onceListener, $event, $listener) {
$this->removeListener($event, $onceListener);
call_user_func_array($listener, func_get_args());
};
$this->on($event, $onceListener);
}
public function removeListener($event, callable $listener)
{
if (isset($this->listeners[$event])) {
$index = array_search($listener, $this->listeners[$event], true);
if (false !== $index) {
unset($this->listeners[$event][$index]);
}
}
}
public function removeAllListeners($event = null)
{
if ($event !== null) {
unset($this->listeners[$event]);
} else {
$this->listeners = [];
}
}
public function listeners($event)
{
return isset($this->listeners[$event]) ? $this->listeners[$event] : [];
}
public function emit($event, array $arguments = [])
{
foreach ($this->listeners($event) as $listener) {
call_user_func_array($listener, $arguments);
}
}
}

View File

@ -0,0 +1,327 @@
<?php
namespace React\EventLoop;
use Event;
use EventBase;
use EventConfig as EventBaseConfig;
use React\EventLoop\Tick\FutureTickQueue;
use React\EventLoop\Tick\NextTickQueue;
use React\EventLoop\Timer\Timer;
use React\EventLoop\Timer\TimerInterface;
use SplObjectStorage;
/**
* An ext-event based React.
*/
class ExtEventLoop implements LoopInterface
{
private $eventBase;
private $nextTickQueue;
private $futureTickQueue;
private $timerCallback;
private $timerEvents;
private $streamCallback;
private $streamEvents = [];
private $streamFlags = [];
private $readListeners = [];
private $writeListeners = [];
private $running;
public function __construct(EventBaseConfig $config = null)
{
$this->eventBase = new EventBase($config);
$this->nextTickQueue = new NextTickQueue($this);
$this->futureTickQueue = new FutureTickQueue($this);
$this->timerEvents = new SplObjectStorage();
$this->createTimerCallback();
$this->createStreamCallback();
}
/**
* {@inheritdoc}
*/
public function addReadStream($stream, callable $listener)
{
$key = (int) $stream;
if (!isset($this->readListeners[$key])) {
$this->readListeners[$key] = $listener;
$this->subscribeStreamEvent($stream, Event::READ);
}
}
/**
* {@inheritdoc}
*/
public function addWriteStream($stream, callable $listener)
{
$key = (int) $stream;
if (!isset($this->writeListeners[$key])) {
$this->writeListeners[$key] = $listener;
$this->subscribeStreamEvent($stream, Event::WRITE);
}
}
/**
* {@inheritdoc}
*/
public function removeReadStream($stream)
{
$key = (int) $stream;
if (isset($this->readListeners[$key])) {
unset($this->readListeners[$key]);
$this->unsubscribeStreamEvent($stream, Event::READ);
}
}
/**
* {@inheritdoc}
*/
public function removeWriteStream($stream)
{
$key = (int) $stream;
if (isset($this->writeListeners[$key])) {
unset($this->writeListeners[$key]);
$this->unsubscribeStreamEvent($stream, Event::WRITE);
}
}
/**
* {@inheritdoc}
*/
public function removeStream($stream)
{
$key = (int) $stream;
if (isset($this->streamEvents[$key])) {
$this->streamEvents[$key]->free();
unset(
$this->streamFlags[$key],
$this->streamEvents[$key],
$this->readListeners[$key],
$this->writeListeners[$key]
);
}
}
/**
* {@inheritdoc}
*/
public function addTimer($interval, callable $callback)
{
$timer = new Timer($this, $interval, $callback, false);
$this->scheduleTimer($timer);
return $timer;
}
/**
* {@inheritdoc}
*/
public function addPeriodicTimer($interval, callable $callback)
{
$timer = new Timer($this, $interval, $callback, true);
$this->scheduleTimer($timer);
return $timer;
}
/**
* {@inheritdoc}
*/
public function cancelTimer(TimerInterface $timer)
{
if ($this->isTimerActive($timer)) {
$this->timerEvents[$timer]->free();
$this->timerEvents->detach($timer);
}
}
/**
* {@inheritdoc}
*/
public function isTimerActive(TimerInterface $timer)
{
return $this->timerEvents->contains($timer);
}
/**
* {@inheritdoc}
*/
public function nextTick(callable $listener)
{
$this->nextTickQueue->add($listener);
}
/**
* {@inheritdoc}
*/
public function futureTick(callable $listener)
{
$this->futureTickQueue->add($listener);
}
/**
* {@inheritdoc}
*/
public function tick()
{
$this->nextTickQueue->tick();
$this->futureTickQueue->tick();
// @-suppression: https://github.com/reactphp/react/pull/234#discussion-diff-7759616R226
@$this->eventBase->loop(EventBase::LOOP_ONCE | EventBase::LOOP_NONBLOCK);
}
/**
* {@inheritdoc}
*/
public function run()
{
$this->running = true;
while ($this->running) {
$this->nextTickQueue->tick();
$this->futureTickQueue->tick();
$flags = EventBase::LOOP_ONCE;
if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) {
$flags |= EventBase::LOOP_NONBLOCK;
} elseif (!$this->streamEvents && !$this->timerEvents->count()) {
break;
}
// @-suppression: https://github.com/reactphp/react/pull/234#discussion-diff-7759616R226
@$this->eventBase->loop($flags);
}
}
/**
* {@inheritdoc}
*/
public function stop()
{
$this->running = false;
}
/**
* Schedule a timer for execution.
*
* @param TimerInterface $timer
*/
private function scheduleTimer(TimerInterface $timer)
{
$flags = Event::TIMEOUT;
if ($timer->isPeriodic()) {
$flags |= Event::PERSIST;
}
$event = new Event($this->eventBase, -1, $flags, $this->timerCallback, $timer);
$this->timerEvents[$timer] = $event;
$event->add($timer->getInterval());
}
/**
* Create a new ext-event Event object, or update the existing one.
*
* @param resource $stream
* @param integer $flag Event::READ or Event::WRITE
*/
private function subscribeStreamEvent($stream, $flag)
{
$key = (int) $stream;
if (isset($this->streamEvents[$key])) {
$event = $this->streamEvents[$key];
$flags = ($this->streamFlags[$key] |= $flag);
$event->del();
$event->set($this->eventBase, $stream, Event::PERSIST | $flags, $this->streamCallback);
} else {
$event = new Event($this->eventBase, $stream, Event::PERSIST | $flag, $this->streamCallback);
$this->streamEvents[$key] = $event;
$this->streamFlags[$key] = $flag;
}
$event->add();
}
/**
* Update the ext-event Event object for this stream to stop listening to
* the given event type, or remove it entirely if it's no longer needed.
*
* @param resource $stream
* @param integer $flag Event::READ or Event::WRITE
*/
private function unsubscribeStreamEvent($stream, $flag)
{
$key = (int) $stream;
$flags = $this->streamFlags[$key] &= ~$flag;
if (0 === $flags) {
$this->removeStream($stream);
return;
}
$event = $this->streamEvents[$key];
$event->del();
$event->set($this->eventBase, $stream, Event::PERSIST | $flags, $this->streamCallback);
$event->add();
}
/**
* Create a callback used as the target of timer events.
*
* A reference is kept to the callback for the lifetime of the loop
* to prevent "Cannot destroy active lambda function" fatal error from
* the event extension.
*/
private function createTimerCallback()
{
$this->timerCallback = function ($_, $_, $timer) {
call_user_func($timer->getCallback(), $timer);
if (!$timer->isPeriodic() && $this->isTimerActive($timer)) {
$this->cancelTimer($timer);
}
};
}
/**
* Create a callback used as the target of stream events.
*
* A reference is kept to the callback for the lifetime of the loop
* to prevent "Cannot destroy active lambda function" fatal error from
* the event extension.
*/
private function createStreamCallback()
{
$this->streamCallback = function ($stream, $flags) {
$key = (int) $stream;
if (Event::READ === (Event::READ & $flags) && isset($this->readListeners[$key])) {
call_user_func($this->readListeners[$key], $stream, $this);
}
if (Event::WRITE === (Event::WRITE & $flags) && isset($this->writeListeners[$key])) {
call_user_func($this->writeListeners[$key], $stream, $this);
}
};
}
}

View File

@ -0,0 +1,21 @@
<?php
namespace React\EventLoop;
class Factory
{
public static function create()
{
// @codeCoverageIgnoreStart
if (function_exists('event_base_new')) {
return new LibEventLoop();
} elseif (class_exists('libev\EventLoop', false)) {
return new LibEvLoop;
} elseif (class_exists('EventBase', false)) {
return new ExtEventLoop;
}
return new StreamSelectLoop();
// @codeCoverageIgnoreEnd
}
}

View File

@ -0,0 +1,218 @@
<?php
namespace React\EventLoop;
use libev\EventLoop;
use libev\IOEvent;
use libev\TimerEvent;
use React\EventLoop\Tick\FutureTickQueue;
use React\EventLoop\Tick\NextTickQueue;
use React\EventLoop\Timer\Timer;
use React\EventLoop\Timer\TimerInterface;
use SplObjectStorage;
/**
* @see https://github.com/m4rw3r/php-libev
* @see https://gist.github.com/1688204
*/
class LibEvLoop implements LoopInterface
{
private $loop;
private $nextTickQueue;
private $futureTickQueue;
private $timerEvents;
private $readEvents = [];
private $writeEvents = [];
private $running;
public function __construct()
{
$this->loop = new EventLoop();
$this->nextTickQueue = new NextTickQueue($this);
$this->futureTickQueue = new FutureTickQueue($this);
$this->timerEvents = new SplObjectStorage();
}
/**
* {@inheritdoc}
*/
public function addReadStream($stream, callable $listener)
{
$callback = function () use ($stream, $listener) {
call_user_func($listener, $stream, $this);
};
$event = new IOEvent($callback, $stream, IOEvent::READ);
$this->loop->add($event);
$this->readEvents[(int) $stream] = $event;
}
/**
* {@inheritdoc}
*/
public function addWriteStream($stream, callable $listener)
{
$callback = function () use ($stream, $listener) {
call_user_func($listener, $stream, $this);
};
$event = new IOEvent($callback, $stream, IOEvent::WRITE);
$this->loop->add($event);
$this->writeEvents[(int) $stream] = $event;
}
/**
* {@inheritdoc}
*/
public function removeReadStream($stream)
{
$key = (int) $stream;
if (isset($this->readEvents[$key])) {
$this->readEvents[$key]->stop();
unset($this->readEvents[$key]);
}
}
/**
* {@inheritdoc}
*/
public function removeWriteStream($stream)
{
$key = (int) $stream;
if (isset($this->writeEvents[$key])) {
$this->writeEvents[$key]->stop();
unset($this->writeEvents[$key]);
}
}
/**
* {@inheritdoc}
*/
public function removeStream($stream)
{
$this->removeReadStream($stream);
$this->removeWriteStream($stream);
}
/**
* {@inheritdoc}
*/
public function addTimer($interval, callable $callback)
{
$timer = new Timer($this, $interval, $callback, false);
$callback = function () use ($timer) {
call_user_func($timer->getCallback(), $timer);
if ($this->isTimerActive($timer)) {
$this->cancelTimer($timer);
}
};
$event = new TimerEvent($callback, $timer->getInterval());
$this->timerEvents->attach($timer, $event);
$this->loop->add($event);
return $timer;
}
/**
* {@inheritdoc}
*/
public function addPeriodicTimer($interval, callable $callback)
{
$timer = new Timer($this, $interval, $callback, true);
$callback = function () use ($timer) {
call_user_func($timer->getCallback(), $timer);
};
$event = new TimerEvent($callback, $interval, $interval);
$this->timerEvents->attach($timer, $event);
$this->loop->add($event);
return $timer;
}
/**
* {@inheritdoc}
*/
public function cancelTimer(TimerInterface $timer)
{
if (isset($this->timerEvents[$timer])) {
$this->loop->remove($this->timerEvents[$timer]);
$this->timerEvents->detach($timer);
}
}
/**
* {@inheritdoc}
*/
public function isTimerActive(TimerInterface $timer)
{
return $this->timerEvents->contains($timer);
}
/**
* {@inheritdoc}
*/
public function nextTick(callable $listener)
{
$this->nextTickQueue->add($listener);
}
/**
* {@inheritdoc}
*/
public function futureTick(callable $listener)
{
$this->futureTickQueue->add($listener);
}
/**
* {@inheritdoc}
*/
public function tick()
{
$this->nextTickQueue->tick();
$this->futureTickQueue->tick();
$this->loop->run(EventLoop::RUN_ONCE | EventLoop::RUN_NOWAIT);
}
/**
* {@inheritdoc}
*/
public function run()
{
$this->running = true;
while ($this->running) {
$this->nextTickQueue->tick();
$this->futureTickQueue->tick();
$flags = EventLoop::RUN_ONCE;
if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) {
$flags |= EventLoop::RUN_NOWAIT;
} elseif (!$this->readEvents && !$this->writeEvents && !$this->timerEvents->count()) {
break;
}
$this->loop->run($flags);
}
}
/**
* {@inheritdoc}
*/
public function stop()
{
$this->running = false;
}
}

View File

@ -0,0 +1,343 @@
<?php
namespace React\EventLoop;
use Event;
use EventBase;
use React\EventLoop\Tick\FutureTickQueue;
use React\EventLoop\Tick\NextTickQueue;
use React\EventLoop\Timer\Timer;
use React\EventLoop\Timer\TimerInterface;
use SplObjectStorage;
/**
* An ext-libevent based React.
*/
class LibEventLoop implements LoopInterface
{
const MICROSECONDS_PER_SECOND = 1000000;
private $eventBase;
private $nextTickQueue;
private $futureTickQueue;
private $timerCallback;
private $timerEvents;
private $streamCallback;
private $streamEvents = [];
private $streamFlags = [];
private $readListeners = [];
private $writeListeners = [];
private $running;
public function __construct()
{
$this->eventBase = event_base_new();
$this->nextTickQueue = new NextTickQueue($this);
$this->futureTickQueue = new FutureTickQueue($this);
$this->timerEvents = new SplObjectStorage();
$this->createTimerCallback();
$this->createStreamCallback();
}
/**
* {@inheritdoc}
*/
public function addReadStream($stream, callable $listener)
{
$key = (int) $stream;
if (!isset($this->readListeners[$key])) {
$this->readListeners[$key] = $listener;
$this->subscribeStreamEvent($stream, EV_READ);
}
}
/**
* {@inheritdoc}
*/
public function addWriteStream($stream, callable $listener)
{
$key = (int) $stream;
if (!isset($this->writeListeners[$key])) {
$this->writeListeners[$key] = $listener;
$this->subscribeStreamEvent($stream, EV_WRITE);
}
}
/**
* {@inheritdoc}
*/
public function removeReadStream($stream)
{
$key = (int) $stream;
if (isset($this->readListeners[$key])) {
unset($this->readListeners[$key]);
$this->unsubscribeStreamEvent($stream, EV_READ);
}
}
/**
* {@inheritdoc}
*/
public function removeWriteStream($stream)
{
$key = (int) $stream;
if (isset($this->writeListeners[$key])) {
unset($this->writeListeners[$key]);
$this->unsubscribeStreamEvent($stream, EV_WRITE);
}
}
/**
* {@inheritdoc}
*/
public function removeStream($stream)
{
$key = (int) $stream;
if (isset($this->streamEvents[$key])) {
$event = $this->streamEvents[$key];
event_del($event);
event_free($event);
unset(
$this->streamFlags[$key],
$this->streamEvents[$key],
$this->readListeners[$key],
$this->writeListeners[$key]
);
}
}
/**
* {@inheritdoc}
*/
public function addTimer($interval, callable $callback)
{
$timer = new Timer($this, $interval, $callback, false);
$this->scheduleTimer($timer);
return $timer;
}
/**
* {@inheritdoc}
*/
public function addPeriodicTimer($interval, callable $callback)
{
$timer = new Timer($this, $interval, $callback, true);
$this->scheduleTimer($timer);
return $timer;
}
/**
* {@inheritdoc}
*/
public function cancelTimer(TimerInterface $timer)
{
if ($this->isTimerActive($timer)) {
$event = $this->timerEvents[$timer];
event_del($event);
event_free($event);
$this->timerEvents->detach($timer);
}
}
/**
* {@inheritdoc}
*/
public function isTimerActive(TimerInterface $timer)
{
return $this->timerEvents->contains($timer);
}
/**
* {@inheritdoc}
*/
public function nextTick(callable $listener)
{
$this->nextTickQueue->add($listener);
}
/**
* {@inheritdoc}
*/
public function futureTick(callable $listener)
{
$this->futureTickQueue->add($listener);
}
/**
* {@inheritdoc}
*/
public function tick()
{
$this->nextTickQueue->tick();
$this->futureTickQueue->tick();
event_base_loop($this->eventBase, EVLOOP_ONCE | EVLOOP_NONBLOCK);
}
/**
* {@inheritdoc}
*/
public function run()
{
$this->running = true;
while ($this->running) {
$this->nextTickQueue->tick();
$this->futureTickQueue->tick();
$flags = EVLOOP_ONCE;
if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) {
$flags |= EVLOOP_NONBLOCK;
} elseif (!$this->streamEvents && !$this->timerEvents->count()) {
break;
}
event_base_loop($this->eventBase, $flags);
}
}
/**
* {@inheritdoc}
*/
public function stop()
{
$this->running = false;
}
/**
* Schedule a timer for execution.
*
* @param TimerInterface $timer
*/
private function scheduleTimer(TimerInterface $timer)
{
$this->timerEvents[$timer] = $event = event_timer_new();
event_timer_set($event, $this->timerCallback, $timer);
event_base_set($event, $this->eventBase);
event_add($event, $timer->getInterval() * self::MICROSECONDS_PER_SECOND);
}
/**
* Create a new ext-libevent event resource, or update the existing one.
*
* @param resource $stream
* @param integer $flag EV_READ or EV_WRITE
*/
private function subscribeStreamEvent($stream, $flag)
{
$key = (int) $stream;
if (isset($this->streamEvents[$key])) {
$event = $this->streamEvents[$key];
$flags = $this->streamFlags[$key] |= $flag;
event_del($event);
event_set($event, $stream, EV_PERSIST | $flags, $this->streamCallback);
} else {
$event = event_new();
event_set($event, $stream, EV_PERSIST | $flag, $this->streamCallback);
event_base_set($event, $this->eventBase);
$this->streamEvents[$key] = $event;
$this->streamFlags[$key] = $flag;
}
event_add($event);
}
/**
* Update the ext-libevent event resource for this stream to stop listening to
* the given event type, or remove it entirely if it's no longer needed.
*
* @param resource $stream
* @param integer $flag EV_READ or EV_WRITE
*/
private function unsubscribeStreamEvent($stream, $flag)
{
$key = (int) $stream;
$flags = $this->streamFlags[$key] &= ~$flag;
if (0 === $flags) {
$this->removeStream($stream);
return;
}
$event = $this->streamEvents[$key];
event_del($event);
event_set($event, $stream, EV_PERSIST | $flags, $this->streamCallback);
event_add($event);
}
/**
* Create a callback used as the target of timer events.
*
* A reference is kept to the callback for the lifetime of the loop
* to prevent "Cannot destroy active lambda function" fatal error from
* the event extension.
*/
private function createTimerCallback()
{
$this->timerCallback = function ($_, $_, $timer) {
call_user_func($timer->getCallback(), $timer);
// Timer already cancelled ...
if (!$this->isTimerActive($timer)) {
return;
// Reschedule periodic timers ...
} elseif ($timer->isPeriodic()) {
event_add(
$this->timerEvents[$timer],
$timer->getInterval() * self::MICROSECONDS_PER_SECOND
);
// Clean-up one shot timers ...
} else {
$this->cancelTimer($timer);
}
};
}
/**
* Create a callback used as the target of stream events.
*
* A reference is kept to the callback for the lifetime of the loop
* to prevent "Cannot destroy active lambda function" fatal error from
* the event extension.
*/
private function createStreamCallback()
{
$this->streamCallback = function ($stream, $flags) {
$key = (int) $stream;
if (EV_READ === (EV_READ & $flags) && isset($this->readListeners[$key])) {
call_user_func($this->readListeners[$key], $stream, $this);
}
if (EV_WRITE === (EV_WRITE & $flags) && isset($this->writeListeners[$key])) {
call_user_func($this->writeListeners[$key], $stream, $this);
}
};
}
}

View File

@ -0,0 +1,121 @@
<?php
namespace React\EventLoop;
use React\EventLoop\Timer\TimerInterface;
interface LoopInterface
{
/**
* Register a listener to be notified when a stream is ready to read.
*
* @param resource $stream The PHP stream resource to check.
* @param callable $listener Invoked when the stream is ready.
*/
public function addReadStream($stream, callable $listener);
/**
* Register a listener to be notified when a stream is ready to write.
*
* @param resource $stream The PHP stream resource to check.
* @param callable $listener Invoked when the stream is ready.
*/
public function addWriteStream($stream, callable $listener);
/**
* Remove the read event listener for the given stream.
*
* @param resource $stream The PHP stream resource.
*/
public function removeReadStream($stream);
/**
* Remove the write event listener for the given stream.
*
* @param resource $stream The PHP stream resource.
*/
public function removeWriteStream($stream);
/**
* Remove all listeners for the given stream.
*
* @param resource $stream The PHP stream resource.
*/
public function removeStream($stream);
/**
* Enqueue a callback to be invoked once after the given interval.
*
* The execution order of timers scheduled to execute at the same time is
* not guaranteed.
*
* @param int|float $interval The number of seconds to wait before execution.
* @param callable $callback The callback to invoke.
*
* @return TimerInterface
*/
public function addTimer($interval, callable $callback);
/**
* Enqueue a callback to be invoked repeatedly after the given interval.
*
* The execution order of timers scheduled to execute at the same time is
* not guaranteed.
*
* @param int|float $interval The number of seconds to wait before execution.
* @param callable $callback The callback to invoke.
*
* @return TimerInterface
*/
public function addPeriodicTimer($interval, callable $callback);
/**
* Cancel a pending timer.
*
* @param TimerInterface $timer The timer to cancel.
*/
public function cancelTimer(TimerInterface $timer);
/**
* Check if a given timer is active.
*
* @param TimerInterface $timer The timer to check.
*
* @return boolean True if the timer is still enqueued for execution.
*/
public function isTimerActive(TimerInterface $timer);
/**
* Schedule a callback to be invoked on the next tick of the event loop.
*
* Callbacks are guaranteed to be executed in the order they are enqueued,
* before any timer or stream events.
*
* @param callable $listener The callback to invoke.
*/
public function nextTick(callable $listener);
/**
* Schedule a callback to be invoked on a future tick of the event loop.
*
* Callbacks are guaranteed to be executed in the order they are enqueued.
*
* @param callable $listener The callback to invoke.
*/
public function futureTick(callable $listener);
/**
* Perform a single iteration of the event loop.
*/
public function tick();
/**
* Run the event loop until there are no more tasks to perform.
*/
public function run();
/**
* Instruct a running event loop to stop.
*/
public function stop();
}

View File

@ -0,0 +1,262 @@
<?php
namespace React\EventLoop;
use React\EventLoop\Tick\FutureTickQueue;
use React\EventLoop\Tick\NextTickQueue;
use React\EventLoop\Timer\Timer;
use React\EventLoop\Timer\TimerInterface;
use React\EventLoop\Timer\Timers;
/**
* A stream_select() based React.
*/
class StreamSelectLoop implements LoopInterface
{
const MICROSECONDS_PER_SECOND = 1000000;
private $nextTickQueue;
private $futureTickQueue;
private $timers;
private $readStreams = [];
private $readListeners = [];
private $writeStreams = [];
private $writeListeners = [];
private $running;
public function __construct()
{
$this->nextTickQueue = new NextTickQueue($this);
$this->futureTickQueue = new FutureTickQueue($this);
$this->timers = new Timers();
}
/**
* {@inheritdoc}
*/
public function addReadStream($stream, callable $listener)
{
$key = (int) $stream;
if (!isset($this->readStreams[$key])) {
$this->readStreams[$key] = $stream;
$this->readListeners[$key] = $listener;
}
}
/**
* {@inheritdoc}
*/
public function addWriteStream($stream, callable $listener)
{
$key = (int) $stream;
if (!isset($this->writeStreams[$key])) {
$this->writeStreams[$key] = $stream;
$this->writeListeners[$key] = $listener;
}
}
/**
* {@inheritdoc}
*/
public function removeReadStream($stream)
{
$key = (int) $stream;
unset(
$this->readStreams[$key],
$this->readListeners[$key]
);
}
/**
* {@inheritdoc}
*/
public function removeWriteStream($stream)
{
$key = (int) $stream;
unset(
$this->writeStreams[$key],
$this->writeListeners[$key]
);
}
/**
* {@inheritdoc}
*/
public function removeStream($stream)
{
$this->removeReadStream($stream);
$this->removeWriteStream($stream);
}
/**
* {@inheritdoc}
*/
public function addTimer($interval, callable $callback)
{
$timer = new Timer($this, $interval, $callback, false);
$this->timers->add($timer);
return $timer;
}
/**
* {@inheritdoc}
*/
public function addPeriodicTimer($interval, callable $callback)
{
$timer = new Timer($this, $interval, $callback, true);
$this->timers->add($timer);
return $timer;
}
/**
* {@inheritdoc}
*/
public function cancelTimer(TimerInterface $timer)
{
$this->timers->cancel($timer);
}
/**
* {@inheritdoc}
*/
public function isTimerActive(TimerInterface $timer)
{
return $this->timers->contains($timer);
}
/**
* {@inheritdoc}
*/
public function nextTick(callable $listener)
{
$this->nextTickQueue->add($listener);
}
/**
* {@inheritdoc}
*/
public function futureTick(callable $listener)
{
$this->futureTickQueue->add($listener);
}
/**
* {@inheritdoc}
*/
public function tick()
{
$this->nextTickQueue->tick();
$this->futureTickQueue->tick();
$this->timers->tick();
$this->waitForStreamActivity(0);
}
/**
* {@inheritdoc}
*/
public function run()
{
$this->running = true;
while ($this->running) {
$this->nextTickQueue->tick();
$this->futureTickQueue->tick();
$this->timers->tick();
// Next-tick or future-tick queues have pending callbacks ...
if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) {
$timeout = 0;
// There is a pending timer, only block until it is due ...
} elseif ($scheduledAt = $this->timers->getFirst()) {
$timeout = $scheduledAt - $this->timers->getTime();
if ($timeout < 0) {
$timeout = 0;
} else {
$timeout *= self::MICROSECONDS_PER_SECOND;
}
// The only possible event is stream activity, so wait forever ...
} elseif ($this->readStreams || $this->writeStreams) {
$timeout = null;
// There's nothing left to do ...
} else {
break;
}
$this->waitForStreamActivity($timeout);
}
}
/**
* {@inheritdoc}
*/
public function stop()
{
$this->running = false;
}
/**
* Wait/check for stream activity, or until the next timer is due.
*/
private function waitForStreamActivity($timeout)
{
$read = $this->readStreams;
$write = $this->writeStreams;
$this->streamSelect($read, $write, $timeout);
foreach ($read as $stream) {
$key = (int) $stream;
if (isset($this->readListeners[$key])) {
call_user_func($this->readListeners[$key], $stream, $this);
}
}
foreach ($write as $stream) {
$key = (int) $stream;
if (isset($this->writeListeners[$key])) {
call_user_func($this->writeListeners[$key], $stream, $this);
}
}
}
/**
* Emulate a stream_select() implementation that does not break when passed
* empty stream arrays.
*
* @param array &$read An array of read streams to select upon.
* @param array &$write An array of write streams to select upon.
* @param integer|null $timeout Activity timeout in microseconds, or null to wait forever.
*
* @return integer The total number of streams that are ready for read/write.
*/
protected function streamSelect(array &$read, array &$write, $timeout)
{
if ($read || $write) {
$except = null;
return stream_select($read, $write, $except, $timeout === null ? null : 0, $timeout);
}
usleep($timeout);
return 0;
}
}

View File

@ -0,0 +1,59 @@
<?php
namespace React\EventLoop\Tick;
use React\EventLoop\LoopInterface;
use SplQueue;
class FutureTickQueue
{
private $eventLoop;
private $queue;
/**
* @param LoopInterface $eventLoop The event loop passed as the first parameter to callbacks.
*/
public function __construct(LoopInterface $eventLoop)
{
$this->eventLoop = $eventLoop;
$this->queue = new SplQueue();
}
/**
* Add a callback to be invoked on a future tick of the event loop.
*
* Callbacks are guaranteed to be executed in the order they are enqueued.
*
* @param callable $listener The callback to invoke.
*/
public function add(callable $listener)
{
$this->queue->enqueue($listener);
}
/**
* Flush the callback queue.
*/
public function tick()
{
// Only invoke as many callbacks as were on the queue when tick() was called.
$count = $this->queue->count();
while ($count--) {
call_user_func(
$this->queue->dequeue(),
$this->eventLoop
);
}
}
/**
* Check if the next tick queue is empty.
*
* @return boolean
*/
public function isEmpty()
{
return $this->queue->isEmpty();
}
}

View File

@ -0,0 +1,57 @@
<?php
namespace React\EventLoop\Tick;
use React\EventLoop\LoopInterface;
use SplQueue;
class NextTickQueue
{
private $eventLoop;
private $queue;
/**
* @param LoopInterface $eventLoop The event loop passed as the first parameter to callbacks.
*/
public function __construct(LoopInterface $eventLoop)
{
$this->eventLoop = $eventLoop;
$this->queue = new SplQueue();
}
/**
* Add a callback to be invoked on the next tick of the event loop.
*
* Callbacks are guaranteed to be executed in the order they are enqueued,
* before any timer or stream events.
*
* @param callable $listener The callback to invoke.
*/
public function add(callable $listener)
{
$this->queue->enqueue($listener);
}
/**
* Flush the callback queue.
*/
public function tick()
{
while (!$this->queue->isEmpty()) {
call_user_func(
$this->queue->dequeue(),
$this->eventLoop
);
}
}
/**
* Check if the next tick queue is empty.
*
* @return boolean
*/
public function isEmpty()
{
return $this->queue->isEmpty();
}
}

View File

@ -0,0 +1,102 @@
<?php
namespace React\EventLoop\Timer;
use React\EventLoop\LoopInterface;
class Timer implements TimerInterface
{
const MIN_INTERVAL = 0.000001;
protected $loop;
protected $interval;
protected $callback;
protected $periodic;
protected $data;
/**
* Constructor initializes the fields of the Timer
*
* @param LoopInterface $loop The loop with which this timer is associated
* @param float $interval The interval after which this timer will execute, in seconds
* @param callable $callback The callback that will be executed when this timer elapses
* @param bool $periodic Whether the time is periodic
* @param mixed $data Arbitrary data associated with timer
*/
public function __construct(LoopInterface $loop, $interval, callable $callback, $periodic = false, $data = null)
{
if ($interval < self::MIN_INTERVAL) {
$interval = self::MIN_INTERVAL;
}
$this->loop = $loop;
$this->interval = (float) $interval;
$this->callback = $callback;
$this->periodic = (bool) $periodic;
$this->data = null;
}
/**
* {@inheritdoc}
*/
public function getLoop()
{
return $this->loop;
}
/**
* {@inheritdoc}
*/
public function getInterval()
{
return $this->interval;
}
/**
* {@inheritdoc}
*/
public function getCallback()
{
return $this->callback;
}
/**
* {@inheritdoc}
*/
public function setData($data)
{
$this->data = $data;
}
/**
* {@inheritdoc}
*/
public function getData()
{
return $this->data;
}
/**
* {@inheritdoc}
*/
public function isPeriodic()
{
return $this->periodic;
}
/**
* {@inheritdoc}
*/
public function isActive()
{
return $this->loop->isTimerActive($this);
}
/**
* {@inheritdoc}
*/
public function cancel()
{
$this->loop->cancelTimer($this);
}
}

View File

@ -0,0 +1,62 @@
<?php
namespace React\EventLoop\Timer;
use React\EventLoop\LoopInterface;
interface TimerInterface
{
/**
* Get the loop with which this timer is associated
*
* @return LoopInterface
*/
public function getLoop();
/**
* Get the interval after which this timer will execute, in seconds
*
* @return float
*/
public function getInterval();
/**
* Get the callback that will be executed when this timer elapses
*
* @return callable
*/
public function getCallback();
/**
* Set arbitrary data associated with timer
*
* @param mixed $data
*/
public function setData($data);
/**
* Get arbitrary data associated with timer
*
* @return mixed
*/
public function getData();
/**
* Determine whether the time is periodic
*
* @return bool
*/
public function isPeriodic();
/**
* Determine whether the time is active
*
* @return bool
*/
public function isActive();
/**
* Cancel this timer
*/
public function cancel();
}

View File

@ -0,0 +1,100 @@
<?php
namespace React\EventLoop\Timer;
use SplObjectStorage;
use SplPriorityQueue;
class Timers
{
private $time;
private $timers;
private $scheduler;
public function __construct()
{
$this->timers = new SplObjectStorage();
$this->scheduler = new SplPriorityQueue();
}
public function updateTime()
{
return $this->time = microtime(true);
}
public function getTime()
{
return $this->time ?: $this->updateTime();
}
public function add(TimerInterface $timer)
{
$interval = $timer->getInterval();
$scheduledAt = $interval + $this->getTime();
$this->timers->attach($timer, $scheduledAt);
$this->scheduler->insert($timer, -$scheduledAt);
}
public function contains(TimerInterface $timer)
{
return $this->timers->contains($timer);
}
public function cancel(TimerInterface $timer)
{
$this->timers->detach($timer);
}
public function getFirst()
{
while ($this->scheduler->count()) {
$timer = $this->scheduler->top();
if ($this->timers->contains($timer)) {
return $this->timers[$timer];
}
$this->scheduler->extract();
}
return null;
}
public function isEmpty()
{
return count($this->timers) === 0;
}
public function tick()
{
$time = $this->updateTime();
$timers = $this->timers;
$scheduler = $this->scheduler;
while (!$scheduler->isEmpty()) {
$timer = $scheduler->top();
if (!isset($timers[$timer])) {
$scheduler->extract();
$timers->detach($timer);
continue;
}
if ($timers[$timer] >= $time) {
break;
}
$scheduler->extract();
call_user_func($timer->getCallback(), $timer);
if ($timer->isPeriodic() && isset($timers[$timer])) {
$timers[$timer] = $scheduledAt = $timer->getInterval() + $time;
$scheduler->insert($timer, -$scheduledAt);
} else {
$timers->detach($timer);
}
}
}
}

View File

@ -0,0 +1,42 @@
<?php
namespace React\Socket;
use React\Stream\Stream;
class Connection extends Stream implements ConnectionInterface
{
public function handleData($stream)
{
// Socket is raw, not using fread as it's interceptable by filters
// See issues #192, #209, and #240
$data = stream_socket_recvfrom($stream, $this->bufferSize);
if ('' !== $data && false !== $data) {
$this->emit('data', array($data, $this));
}
if ('' === $data || false === $data || !is_resource($stream) || feof($stream)) {
$this->end();
}
}
public function handleClose()
{
if (is_resource($this->stream)) {
// http://chat.stackoverflow.com/transcript/message/7727858#7727858
stream_socket_shutdown($this->stream, STREAM_SHUT_RDWR);
stream_set_blocking($this->stream, false);
fclose($this->stream);
}
}
public function getRemoteAddress()
{
return $this->parseAddress(stream_socket_get_name($this->stream, true));
}
private function parseAddress($address)
{
return trim(substr($address, 0, strrpos($address, ':')), '[]');
}
}

View File

@ -0,0 +1,7 @@
<?php
namespace React\Socket;
class ConnectionException extends \ErrorException
{
}

View File

@ -0,0 +1,12 @@
<?php
namespace React\Socket;
use Evenement\EventEmitterInterface;
use React\Stream\ReadableStreamInterface;
use React\Stream\WritableStreamInterface;
interface ConnectionInterface extends ReadableStreamInterface, WritableStreamInterface
{
public function getRemoteAddress();
}

View File

@ -0,0 +1,71 @@
<?php
namespace React\Socket;
use Evenement\EventEmitter;
use React\EventLoop\LoopInterface;
/** @event connection */
class Server extends EventEmitter implements ServerInterface
{
public $master;
private $loop;
public function __construct(LoopInterface $loop)
{
$this->loop = $loop;
}
public function listen($port, $host = '127.0.0.1')
{
if (strpos($host, ':') !== false) {
// enclose IPv6 addresses in square brackets before appending port
$host = '[' . $host . ']';
}
$this->master = @stream_socket_server("tcp://$host:$port", $errno, $errstr);
if (false === $this->master) {
$message = "Could not bind to tcp://$host:$port: $errstr";
throw new ConnectionException($message, $errno);
}
stream_set_blocking($this->master, 0);
$this->loop->addReadStream($this->master, function ($master) {
$newSocket = stream_socket_accept($master);
if (false === $newSocket) {
$this->emit('error', array(new \RuntimeException('Error accepting new connection')));
return;
}
$this->handleConnection($newSocket);
});
}
public function handleConnection($socket)
{
stream_set_blocking($socket, 0);
$client = $this->createConnection($socket);
$this->emit('connection', array($client));
}
public function getPort()
{
$name = stream_socket_get_name($this->master, false);
return (int) substr(strrchr($name, ':'), 1);
}
public function shutdown()
{
$this->loop->removeStream($this->master);
fclose($this->master);
$this->removeAllListeners();
}
public function createConnection($socket)
{
return new Connection($socket, $this->loop);
}
}

View File

@ -0,0 +1,13 @@
<?php
namespace React\Socket;
use Evenement\EventEmitterInterface;
/** @event connection */
interface ServerInterface extends EventEmitterInterface
{
public function listen($port, $host = '127.0.0.1');
public function getPort();
public function shutdown();
}

View File

@ -0,0 +1,135 @@
<?php
namespace React\Stream;
use Evenement\EventEmitter;
use React\EventLoop\LoopInterface;
/** @event full-drain */
class Buffer extends EventEmitter implements WritableStreamInterface
{
public $stream;
public $listening = false;
public $softLimit = 2048;
private $writable = true;
private $loop;
private $data = '';
private $lastError = array(
'number' => 0,
'message' => '',
'file' => '',
'line' => 0,
);
public function __construct($stream, LoopInterface $loop)
{
$this->stream = $stream;
$this->loop = $loop;
}
public function isWritable()
{
return $this->writable;
}
public function write($data)
{
if (!$this->writable) {
return;
}
$this->data .= $data;
if (!$this->listening) {
$this->listening = true;
$this->loop->addWriteStream($this->stream, array($this, 'handleWrite'));
}
$belowSoftLimit = strlen($this->data) < $this->softLimit;
return $belowSoftLimit;
}
public function end($data = null)
{
if (null !== $data) {
$this->write($data);
}
$this->writable = false;
if ($this->listening) {
$this->on('full-drain', array($this, 'close'));
} else {
$this->close();
}
}
public function close()
{
$this->writable = false;
$this->listening = false;
$this->data = '';
$this->emit('close', [$this]);
}
public function handleWrite()
{
if (!is_resource($this->stream)) {
$this->emit('error', array(new \RuntimeException('Tried to write to invalid stream.'), $this));
return;
}
set_error_handler(array($this, 'errorHandler'));
$sent = fwrite($this->stream, $this->data);
restore_error_handler();
if (false === $sent) {
$this->emit('error', array(
new \ErrorException(
$this->lastError['message'],
0,
$this->lastError['number'],
$this->lastError['file'],
$this->lastError['line']
),
$this
));
return;
}
if (0 === $sent && feof($this->stream)) {
$this->emit('error', array(new \RuntimeException('Tried to write to closed stream.'), $this));
return;
}
$len = strlen($this->data);
if ($len >= $this->softLimit && $len - $sent < $this->softLimit) {
$this->emit('drain', [$this]);
}
$this->data = (string) substr($this->data, $sent);
if (0 === strlen($this->data)) {
$this->loop->removeWriteStream($this->stream);
$this->listening = false;
$this->emit('full-drain', [$this]);
}
}
private function errorHandler($errno, $errstr, $errfile, $errline)
{
$this->lastError['number'] = $errno;
$this->lastError['message'] = $errstr;
$this->lastError['file'] = $errfile;
$this->lastError['line'] = $errline;
}
}

View File

@ -0,0 +1,59 @@
<?php
namespace React\Stream;
use React\Promise\Deferred;
use React\Promise\PromisorInterface;
class BufferedSink extends WritableStream implements PromisorInterface
{
private $buffer = '';
private $deferred;
public function __construct()
{
$this->deferred = new Deferred();
$this->on('pipe', array($this, 'handlePipeEvent'));
$this->on('error', array($this, 'handleErrorEvent'));
}
public function handlePipeEvent($source)
{
Util::forwardEvents($source, $this, array('error'));
}
public function handleErrorEvent($e)
{
$this->deferred->reject($e);
}
public function write($data)
{
$this->buffer .= $data;
$this->deferred->progress($data);
}
public function close()
{
if ($this->closed) {
return;
}
parent::close();
$this->deferred->resolve($this->buffer);
}
public function promise()
{
return $this->deferred->promise();
}
public static function createPromise(ReadableStreamInterface $stream)
{
$sink = new static();
$stream->pipe($sink);
return $sink->promise();
}
}

View File

@ -0,0 +1,84 @@
<?php
namespace React\Stream;
use Evenement\EventEmitter;
class CompositeStream extends EventEmitter implements DuplexStreamInterface
{
protected $readable;
protected $writable;
protected $pipeSource;
public function __construct(ReadableStreamInterface $readable, WritableStreamInterface $writable)
{
$this->readable = $readable;
$this->writable = $writable;
Util::forwardEvents($this->readable, $this, array('data', 'end', 'error', 'close'));
Util::forwardEvents($this->writable, $this, array('drain', 'error', 'close', 'pipe'));
$this->readable->on('close', array($this, 'close'));
$this->writable->on('close', array($this, 'close'));
$this->on('pipe', array($this, 'handlePipeEvent'));
}
public function handlePipeEvent($source)
{
$this->pipeSource = $source;
}
public function isReadable()
{
return $this->readable->isReadable();
}
public function pause()
{
if ($this->pipeSource) {
$this->pipeSource->pause();
}
$this->readable->pause();
}
public function resume()
{
if ($this->pipeSource) {
$this->pipeSource->resume();
}
$this->readable->resume();
}
public function pipe(WritableStreamInterface $dest, array $options = array())
{
Util::pipe($this, $dest, $options);
return $dest;
}
public function isWritable()
{
return $this->writable->isWritable();
}
public function write($data)
{
return $this->writable->write($data);
}
public function end($data = null)
{
$this->writable->end($data);
}
public function close()
{
$this->pipeSource = null;
$this->readable->close();
$this->writable->close();
}
}

View File

@ -0,0 +1,7 @@
<?php
namespace React\Stream;
interface DuplexStreamInterface extends ReadableStreamInterface, WritableStreamInterface
{
}

View File

@ -0,0 +1,42 @@
<?php
namespace React\Stream;
use Evenement\EventEmitter;
class ReadableStream extends EventEmitter implements ReadableStreamInterface
{
protected $closed = false;
public function isReadable()
{
return !$this->closed;
}
public function pause()
{
}
public function resume()
{
}
public function pipe(WritableStreamInterface $dest, array $options = array())
{
Util::pipe($this, $dest, $options);
return $dest;
}
public function close()
{
if ($this->closed) {
return;
}
$this->closed = true;
$this->emit('end', array($this));
$this->emit('close', array($this));
$this->removeAllListeners();
}
}

View File

@ -0,0 +1,20 @@
<?php
namespace React\Stream;
use Evenement\EventEmitterInterface;
/**
* @event data
* @event end
* @event error
* @event close
*/
interface ReadableStreamInterface extends EventEmitterInterface
{
public function isReadable();
public function pause();
public function resume();
public function pipe(WritableStreamInterface $dest, array $options = array());
public function close();
}

View File

@ -0,0 +1,141 @@
<?php
namespace React\Stream;
use Evenement\EventEmitter;
use React\EventLoop\LoopInterface;
use InvalidArgumentException;
class Stream extends EventEmitter implements DuplexStreamInterface
{
public $bufferSize = 4096;
public $stream;
protected $readable = true;
protected $writable = true;
protected $closing = false;
protected $loop;
protected $buffer;
public function __construct($stream, LoopInterface $loop)
{
$this->stream = $stream;
if (!is_resource($this->stream) || get_resource_type($this->stream) !== "stream") {
throw new InvalidArgumentException('First parameter must be a valid stream resource');
}
stream_set_blocking($this->stream, 0);
$this->loop = $loop;
$this->buffer = new Buffer($this->stream, $this->loop);
$this->buffer->on('error', function ($error) {
$this->emit('error', array($error, $this));
$this->close();
});
$this->buffer->on('drain', function () {
$this->emit('drain', array($this));
});
$this->resume();
}
public function isReadable()
{
return $this->readable;
}
public function isWritable()
{
return $this->writable;
}
public function pause()
{
$this->loop->removeReadStream($this->stream);
}
public function resume()
{
if ($this->readable) {
$this->loop->addReadStream($this->stream, array($this, 'handleData'));
}
}
public function write($data)
{
if (!$this->writable) {
return;
}
return $this->buffer->write($data);
}
public function close()
{
if (!$this->writable && !$this->closing) {
return;
}
$this->closing = false;
$this->readable = false;
$this->writable = false;
$this->emit('end', array($this));
$this->emit('close', array($this));
$this->loop->removeStream($this->stream);
$this->buffer->removeAllListeners();
$this->removeAllListeners();
$this->handleClose();
}
public function end($data = null)
{
if (!$this->writable) {
return;
}
$this->closing = true;
$this->readable = false;
$this->writable = false;
$this->buffer->on('close', function () {
$this->close();
});
$this->buffer->end($data);
}
public function pipe(WritableStreamInterface $dest, array $options = array())
{
Util::pipe($this, $dest, $options);
return $dest;
}
public function handleData($stream)
{
$data = fread($stream, $this->bufferSize);
$this->emit('data', array($data, $this));
if (!is_resource($stream) || feof($stream)) {
$this->end();
}
}
public function handleClose()
{
if (is_resource($this->stream)) {
fclose($this->stream);
}
}
public function getBuffer()
{
return $this->buffer;
}
}

View File

@ -0,0 +1,33 @@
<?php
namespace React\Stream;
class ThroughStream extends CompositeStream
{
public function __construct()
{
$readable = new ReadableStream();
$writable = new WritableStream();
parent::__construct($readable, $writable);
}
public function filter($data)
{
return $data;
}
public function write($data)
{
$this->readable->emit('data', array($this->filter($data), $this));
}
public function end($data = null)
{
if (null !== $data) {
$this->readable->emit('data', array($this->filter($data), $this));
}
$this->writable->end($data);
}
}

View File

@ -0,0 +1,45 @@
<?php
namespace React\Stream;
// TODO: move to a trait
class Util
{
public static function pipe(ReadableStreamInterface $source, WritableStreamInterface $dest, array $options = array())
{
// TODO: use stream_copy_to_stream
// it is 4x faster than this
// but can lose data under load with no way to recover it
$dest->emit('pipe', array($source));
$source->on('data', function ($data) use ($source, $dest) {
$feedMore = $dest->write($data);
if (false === $feedMore) {
$source->pause();
}
});
$dest->on('drain', function () use ($source) {
$source->resume();
});
$end = isset($options['end']) ? $options['end'] : true;
if ($end && $source !== $dest) {
$source->on('end', function () use ($dest) {
$dest->end();
});
}
}
public static function forwardEvents($source, $target, array $events)
{
foreach ($events as $event) {
$source->on($event, function () use ($event, $target) {
$target->emit($event, func_get_args());
});
}
}
}

View File

@ -0,0 +1,40 @@
<?php
namespace React\Stream;
use Evenement\EventEmitter;
class WritableStream extends EventEmitter implements WritableStreamInterface
{
protected $closed = false;
public function write($data)
{
}
public function end($data = null)
{
if (null !== $data) {
$this->write($data);
}
$this->close();
}
public function isWritable()
{
return !$this->closed;
}
public function close()
{
if ($this->closed) {
return;
}
$this->closed = true;
$this->emit('end', array($this));
$this->emit('close', array($this));
$this->removeAllListeners();
}
}

View File

@ -0,0 +1,19 @@
<?php
namespace React\Stream;
use Evenement\EventEmitterInterface;
/**
* @event drain
* @event error
* @event close
* @event pipe
*/
interface WritableStreamInterface extends EventEmitterInterface
{
public function isWritable();
public function write($data);
public function end($data = null);
public function close();
}