From fc5a3e04b6e70adbd9f72e608a39ff1e47b6ab20 Mon Sep 17 00:00:00 2001 From: kremsy Date: Sun, 21 Jun 2015 20:43:18 +0200 Subject: [PATCH] added new socket manager, errorhandling and testing is not finished yet --- core/Sockets/SocketListener.php | 14 + core/Sockets/SocketManager.php | 64 ++++ libs/Evenement/EventEmitter.php | 17 + libs/Evenement/EventEmitterInterface.php | 22 ++ libs/Evenement/EventEmitterTrait.php | 68 ++++ libs/React/EventLoop/ExtEventLoop.php | 327 +++++++++++++++++ libs/React/EventLoop/Factory.php | 21 ++ libs/React/EventLoop/LibEvLoop.php | 218 +++++++++++ libs/React/EventLoop/LibEventLoop.php | 343 ++++++++++++++++++ libs/React/EventLoop/LoopInterface.php | 121 ++++++ libs/React/EventLoop/StreamSelectLoop.php | 262 +++++++++++++ libs/React/EventLoop/Tick/FutureTickQueue.php | 59 +++ libs/React/EventLoop/Tick/NextTickQueue.php | 57 +++ libs/React/EventLoop/Timer/Timer.php | 102 ++++++ libs/React/EventLoop/Timer/TimerInterface.php | 62 ++++ libs/React/EventLoop/Timer/Timers.php | 100 +++++ libs/React/Socket/Connection.php | 42 +++ libs/React/Socket/ConnectionException.php | 7 + libs/React/Socket/ConnectionInterface.php | 12 + libs/React/Socket/Server.php | 71 ++++ libs/React/Socket/ServerInterface.php | 13 + libs/React/Stream/Buffer.php | 135 +++++++ libs/React/Stream/BufferedSink.php | 59 +++ libs/React/Stream/CompositeStream.php | 84 +++++ libs/React/Stream/DuplexStreamInterface.php | 7 + libs/React/Stream/ReadableStream.php | 42 +++ libs/React/Stream/ReadableStreamInterface.php | 20 + libs/React/Stream/Stream.php | 141 +++++++ libs/React/Stream/ThroughStream.php | 33 ++ libs/React/Stream/Util.php | 45 +++ libs/React/Stream/WritableStream.php | 40 ++ libs/React/Stream/WritableStreamInterface.php | 19 + 32 files changed, 2627 insertions(+) create mode 100644 core/Sockets/SocketListener.php create mode 100644 core/Sockets/SocketManager.php create mode 100644 libs/Evenement/EventEmitter.php create mode 100644 libs/Evenement/EventEmitterInterface.php create mode 100644 libs/Evenement/EventEmitterTrait.php create mode 100644 libs/React/EventLoop/ExtEventLoop.php create mode 100644 libs/React/EventLoop/Factory.php create mode 100644 libs/React/EventLoop/LibEvLoop.php create mode 100644 libs/React/EventLoop/LibEventLoop.php create mode 100644 libs/React/EventLoop/LoopInterface.php create mode 100644 libs/React/EventLoop/StreamSelectLoop.php create mode 100644 libs/React/EventLoop/Tick/FutureTickQueue.php create mode 100644 libs/React/EventLoop/Tick/NextTickQueue.php create mode 100644 libs/React/EventLoop/Timer/Timer.php create mode 100644 libs/React/EventLoop/Timer/TimerInterface.php create mode 100644 libs/React/EventLoop/Timer/Timers.php create mode 100644 libs/React/Socket/Connection.php create mode 100644 libs/React/Socket/ConnectionException.php create mode 100644 libs/React/Socket/ConnectionInterface.php create mode 100644 libs/React/Socket/Server.php create mode 100644 libs/React/Socket/ServerInterface.php create mode 100644 libs/React/Stream/Buffer.php create mode 100644 libs/React/Stream/BufferedSink.php create mode 100644 libs/React/Stream/CompositeStream.php create mode 100644 libs/React/Stream/DuplexStreamInterface.php create mode 100644 libs/React/Stream/ReadableStream.php create mode 100644 libs/React/Stream/ReadableStreamInterface.php create mode 100644 libs/React/Stream/Stream.php create mode 100644 libs/React/Stream/ThroughStream.php create mode 100644 libs/React/Stream/Util.php create mode 100644 libs/React/Stream/WritableStream.php create mode 100644 libs/React/Stream/WritableStreamInterface.php diff --git a/core/Sockets/SocketListener.php b/core/Sockets/SocketListener.php new file mode 100644 index 00000000..b1afe95f --- /dev/null +++ b/core/Sockets/SocketListener.php @@ -0,0 +1,14 @@ +maniaControl = $maniaControl; + + } + + public function createSocket() { + $loop = Factory::create(); + $server = stream_socket_server('tcp://127.0.0.1:19999'); + stream_set_blocking($server, 0); + + $loop->addReadStream($server, function ($server) use ($loop) { + $conn = stream_socket_accept($server); + $data = "HTTP/1.1 200 OK\r\nContent-Length: 3\r\n\r\nHi\n"; + $loop->addWriteStream($conn, function ($conn) use (&$data, $loop) { + $written = fwrite($conn, $data); + if ($written === strlen($data)) { + fclose($conn); + $loop->removeStream($conn); + } else { + $data = substr($data, 0, $written); + } + }); + }); + + $loop->addPeriodicTimer(5, function () { + $memory = memory_get_usage() / 1024; + $formatted = number_format($memory, 3) . 'K'; + echo "Current memory usage: {$formatted}\n"; + }); + + $loop->tick(); + } + + public function tick() { + + } +} \ No newline at end of file diff --git a/libs/Evenement/EventEmitter.php b/libs/Evenement/EventEmitter.php new file mode 100644 index 00000000..0ef51fc2 --- /dev/null +++ b/libs/Evenement/EventEmitter.php @@ -0,0 +1,17 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Evenement; + +class EventEmitter implements EventEmitterInterface +{ + use EventEmitterTrait; +} diff --git a/libs/Evenement/EventEmitterInterface.php b/libs/Evenement/EventEmitterInterface.php new file mode 100644 index 00000000..9b0b136a --- /dev/null +++ b/libs/Evenement/EventEmitterInterface.php @@ -0,0 +1,22 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Evenement; + +interface EventEmitterInterface +{ + public function on($event, callable $listener); + public function once($event, callable $listener); + public function removeListener($event, callable $listener); + public function removeAllListeners($event = null); + public function listeners($event); + public function emit($event, array $arguments = []); +} diff --git a/libs/Evenement/EventEmitterTrait.php b/libs/Evenement/EventEmitterTrait.php new file mode 100644 index 00000000..b4a4a3a4 --- /dev/null +++ b/libs/Evenement/EventEmitterTrait.php @@ -0,0 +1,68 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Evenement; + +trait EventEmitterTrait +{ + protected $listeners = []; + + public function on($event, callable $listener) + { + if (!isset($this->listeners[$event])) { + $this->listeners[$event] = []; + } + + $this->listeners[$event][] = $listener; + } + + public function once($event, callable $listener) + { + $onceListener = function () use (&$onceListener, $event, $listener) { + $this->removeListener($event, $onceListener); + + call_user_func_array($listener, func_get_args()); + }; + + $this->on($event, $onceListener); + } + + public function removeListener($event, callable $listener) + { + if (isset($this->listeners[$event])) { + $index = array_search($listener, $this->listeners[$event], true); + if (false !== $index) { + unset($this->listeners[$event][$index]); + } + } + } + + public function removeAllListeners($event = null) + { + if ($event !== null) { + unset($this->listeners[$event]); + } else { + $this->listeners = []; + } + } + + public function listeners($event) + { + return isset($this->listeners[$event]) ? $this->listeners[$event] : []; + } + + public function emit($event, array $arguments = []) + { + foreach ($this->listeners($event) as $listener) { + call_user_func_array($listener, $arguments); + } + } +} diff --git a/libs/React/EventLoop/ExtEventLoop.php b/libs/React/EventLoop/ExtEventLoop.php new file mode 100644 index 00000000..b3d16b94 --- /dev/null +++ b/libs/React/EventLoop/ExtEventLoop.php @@ -0,0 +1,327 @@ +eventBase = new EventBase($config); + $this->nextTickQueue = new NextTickQueue($this); + $this->futureTickQueue = new FutureTickQueue($this); + $this->timerEvents = new SplObjectStorage(); + + $this->createTimerCallback(); + $this->createStreamCallback(); + } + + /** + * {@inheritdoc} + */ + public function addReadStream($stream, callable $listener) + { + $key = (int) $stream; + + if (!isset($this->readListeners[$key])) { + $this->readListeners[$key] = $listener; + $this->subscribeStreamEvent($stream, Event::READ); + } + } + + /** + * {@inheritdoc} + */ + public function addWriteStream($stream, callable $listener) + { + $key = (int) $stream; + + if (!isset($this->writeListeners[$key])) { + $this->writeListeners[$key] = $listener; + $this->subscribeStreamEvent($stream, Event::WRITE); + } + } + + /** + * {@inheritdoc} + */ + public function removeReadStream($stream) + { + $key = (int) $stream; + + if (isset($this->readListeners[$key])) { + unset($this->readListeners[$key]); + $this->unsubscribeStreamEvent($stream, Event::READ); + } + } + + /** + * {@inheritdoc} + */ + public function removeWriteStream($stream) + { + $key = (int) $stream; + + if (isset($this->writeListeners[$key])) { + unset($this->writeListeners[$key]); + $this->unsubscribeStreamEvent($stream, Event::WRITE); + } + } + + /** + * {@inheritdoc} + */ + public function removeStream($stream) + { + $key = (int) $stream; + + if (isset($this->streamEvents[$key])) { + $this->streamEvents[$key]->free(); + + unset( + $this->streamFlags[$key], + $this->streamEvents[$key], + $this->readListeners[$key], + $this->writeListeners[$key] + ); + } + } + + /** + * {@inheritdoc} + */ + public function addTimer($interval, callable $callback) + { + $timer = new Timer($this, $interval, $callback, false); + + $this->scheduleTimer($timer); + + return $timer; + } + + /** + * {@inheritdoc} + */ + public function addPeriodicTimer($interval, callable $callback) + { + $timer = new Timer($this, $interval, $callback, true); + + $this->scheduleTimer($timer); + + return $timer; + } + + /** + * {@inheritdoc} + */ + public function cancelTimer(TimerInterface $timer) + { + if ($this->isTimerActive($timer)) { + $this->timerEvents[$timer]->free(); + $this->timerEvents->detach($timer); + } + } + + /** + * {@inheritdoc} + */ + public function isTimerActive(TimerInterface $timer) + { + return $this->timerEvents->contains($timer); + } + + /** + * {@inheritdoc} + */ + public function nextTick(callable $listener) + { + $this->nextTickQueue->add($listener); + } + + /** + * {@inheritdoc} + */ + public function futureTick(callable $listener) + { + $this->futureTickQueue->add($listener); + } + + /** + * {@inheritdoc} + */ + public function tick() + { + $this->nextTickQueue->tick(); + + $this->futureTickQueue->tick(); + + // @-suppression: https://github.com/reactphp/react/pull/234#discussion-diff-7759616R226 + @$this->eventBase->loop(EventBase::LOOP_ONCE | EventBase::LOOP_NONBLOCK); + } + + /** + * {@inheritdoc} + */ + public function run() + { + $this->running = true; + + while ($this->running) { + $this->nextTickQueue->tick(); + + $this->futureTickQueue->tick(); + + $flags = EventBase::LOOP_ONCE; + if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) { + $flags |= EventBase::LOOP_NONBLOCK; + } elseif (!$this->streamEvents && !$this->timerEvents->count()) { + break; + } + + // @-suppression: https://github.com/reactphp/react/pull/234#discussion-diff-7759616R226 + @$this->eventBase->loop($flags); + } + } + + /** + * {@inheritdoc} + */ + public function stop() + { + $this->running = false; + } + + /** + * Schedule a timer for execution. + * + * @param TimerInterface $timer + */ + private function scheduleTimer(TimerInterface $timer) + { + $flags = Event::TIMEOUT; + + if ($timer->isPeriodic()) { + $flags |= Event::PERSIST; + } + + $event = new Event($this->eventBase, -1, $flags, $this->timerCallback, $timer); + $this->timerEvents[$timer] = $event; + + $event->add($timer->getInterval()); + } + + /** + * Create a new ext-event Event object, or update the existing one. + * + * @param resource $stream + * @param integer $flag Event::READ or Event::WRITE + */ + private function subscribeStreamEvent($stream, $flag) + { + $key = (int) $stream; + + if (isset($this->streamEvents[$key])) { + $event = $this->streamEvents[$key]; + $flags = ($this->streamFlags[$key] |= $flag); + + $event->del(); + $event->set($this->eventBase, $stream, Event::PERSIST | $flags, $this->streamCallback); + } else { + $event = new Event($this->eventBase, $stream, Event::PERSIST | $flag, $this->streamCallback); + + $this->streamEvents[$key] = $event; + $this->streamFlags[$key] = $flag; + } + + $event->add(); + } + + /** + * Update the ext-event Event object for this stream to stop listening to + * the given event type, or remove it entirely if it's no longer needed. + * + * @param resource $stream + * @param integer $flag Event::READ or Event::WRITE + */ + private function unsubscribeStreamEvent($stream, $flag) + { + $key = (int) $stream; + + $flags = $this->streamFlags[$key] &= ~$flag; + + if (0 === $flags) { + $this->removeStream($stream); + + return; + } + + $event = $this->streamEvents[$key]; + + $event->del(); + $event->set($this->eventBase, $stream, Event::PERSIST | $flags, $this->streamCallback); + $event->add(); + } + + /** + * Create a callback used as the target of timer events. + * + * A reference is kept to the callback for the lifetime of the loop + * to prevent "Cannot destroy active lambda function" fatal error from + * the event extension. + */ + private function createTimerCallback() + { + $this->timerCallback = function ($_, $_, $timer) { + call_user_func($timer->getCallback(), $timer); + + if (!$timer->isPeriodic() && $this->isTimerActive($timer)) { + $this->cancelTimer($timer); + } + }; + } + + /** + * Create a callback used as the target of stream events. + * + * A reference is kept to the callback for the lifetime of the loop + * to prevent "Cannot destroy active lambda function" fatal error from + * the event extension. + */ + private function createStreamCallback() + { + $this->streamCallback = function ($stream, $flags) { + $key = (int) $stream; + + if (Event::READ === (Event::READ & $flags) && isset($this->readListeners[$key])) { + call_user_func($this->readListeners[$key], $stream, $this); + } + + if (Event::WRITE === (Event::WRITE & $flags) && isset($this->writeListeners[$key])) { + call_user_func($this->writeListeners[$key], $stream, $this); + } + }; + } +} diff --git a/libs/React/EventLoop/Factory.php b/libs/React/EventLoop/Factory.php new file mode 100644 index 00000000..9a481e35 --- /dev/null +++ b/libs/React/EventLoop/Factory.php @@ -0,0 +1,21 @@ +loop = new EventLoop(); + $this->nextTickQueue = new NextTickQueue($this); + $this->futureTickQueue = new FutureTickQueue($this); + $this->timerEvents = new SplObjectStorage(); + } + + /** + * {@inheritdoc} + */ + public function addReadStream($stream, callable $listener) + { + $callback = function () use ($stream, $listener) { + call_user_func($listener, $stream, $this); + }; + + $event = new IOEvent($callback, $stream, IOEvent::READ); + $this->loop->add($event); + + $this->readEvents[(int) $stream] = $event; + } + + /** + * {@inheritdoc} + */ + public function addWriteStream($stream, callable $listener) + { + $callback = function () use ($stream, $listener) { + call_user_func($listener, $stream, $this); + }; + + $event = new IOEvent($callback, $stream, IOEvent::WRITE); + $this->loop->add($event); + + $this->writeEvents[(int) $stream] = $event; + } + + /** + * {@inheritdoc} + */ + public function removeReadStream($stream) + { + $key = (int) $stream; + + if (isset($this->readEvents[$key])) { + $this->readEvents[$key]->stop(); + unset($this->readEvents[$key]); + } + } + + /** + * {@inheritdoc} + */ + public function removeWriteStream($stream) + { + $key = (int) $stream; + + if (isset($this->writeEvents[$key])) { + $this->writeEvents[$key]->stop(); + unset($this->writeEvents[$key]); + } + } + + /** + * {@inheritdoc} + */ + public function removeStream($stream) + { + $this->removeReadStream($stream); + $this->removeWriteStream($stream); + } + + /** + * {@inheritdoc} + */ + public function addTimer($interval, callable $callback) + { + $timer = new Timer($this, $interval, $callback, false); + + $callback = function () use ($timer) { + call_user_func($timer->getCallback(), $timer); + + if ($this->isTimerActive($timer)) { + $this->cancelTimer($timer); + } + }; + + $event = new TimerEvent($callback, $timer->getInterval()); + $this->timerEvents->attach($timer, $event); + $this->loop->add($event); + + return $timer; + } + + /** + * {@inheritdoc} + */ + public function addPeriodicTimer($interval, callable $callback) + { + $timer = new Timer($this, $interval, $callback, true); + + $callback = function () use ($timer) { + call_user_func($timer->getCallback(), $timer); + }; + + $event = new TimerEvent($callback, $interval, $interval); + $this->timerEvents->attach($timer, $event); + $this->loop->add($event); + + return $timer; + } + + /** + * {@inheritdoc} + */ + public function cancelTimer(TimerInterface $timer) + { + if (isset($this->timerEvents[$timer])) { + $this->loop->remove($this->timerEvents[$timer]); + $this->timerEvents->detach($timer); + } + } + + /** + * {@inheritdoc} + */ + public function isTimerActive(TimerInterface $timer) + { + return $this->timerEvents->contains($timer); + } + + /** + * {@inheritdoc} + */ + public function nextTick(callable $listener) + { + $this->nextTickQueue->add($listener); + } + + /** + * {@inheritdoc} + */ + public function futureTick(callable $listener) + { + $this->futureTickQueue->add($listener); + } + + /** + * {@inheritdoc} + */ + public function tick() + { + $this->nextTickQueue->tick(); + + $this->futureTickQueue->tick(); + + $this->loop->run(EventLoop::RUN_ONCE | EventLoop::RUN_NOWAIT); + } + + /** + * {@inheritdoc} + */ + public function run() + { + $this->running = true; + + while ($this->running) { + $this->nextTickQueue->tick(); + + $this->futureTickQueue->tick(); + + $flags = EventLoop::RUN_ONCE; + if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) { + $flags |= EventLoop::RUN_NOWAIT; + } elseif (!$this->readEvents && !$this->writeEvents && !$this->timerEvents->count()) { + break; + } + + $this->loop->run($flags); + } + } + + /** + * {@inheritdoc} + */ + public function stop() + { + $this->running = false; + } +} diff --git a/libs/React/EventLoop/LibEventLoop.php b/libs/React/EventLoop/LibEventLoop.php new file mode 100644 index 00000000..1ab2aa3e --- /dev/null +++ b/libs/React/EventLoop/LibEventLoop.php @@ -0,0 +1,343 @@ +eventBase = event_base_new(); + $this->nextTickQueue = new NextTickQueue($this); + $this->futureTickQueue = new FutureTickQueue($this); + $this->timerEvents = new SplObjectStorage(); + + $this->createTimerCallback(); + $this->createStreamCallback(); + } + + /** + * {@inheritdoc} + */ + public function addReadStream($stream, callable $listener) + { + $key = (int) $stream; + + if (!isset($this->readListeners[$key])) { + $this->readListeners[$key] = $listener; + $this->subscribeStreamEvent($stream, EV_READ); + } + } + + /** + * {@inheritdoc} + */ + public function addWriteStream($stream, callable $listener) + { + $key = (int) $stream; + + if (!isset($this->writeListeners[$key])) { + $this->writeListeners[$key] = $listener; + $this->subscribeStreamEvent($stream, EV_WRITE); + } + } + + /** + * {@inheritdoc} + */ + public function removeReadStream($stream) + { + $key = (int) $stream; + + if (isset($this->readListeners[$key])) { + unset($this->readListeners[$key]); + $this->unsubscribeStreamEvent($stream, EV_READ); + } + } + + /** + * {@inheritdoc} + */ + public function removeWriteStream($stream) + { + $key = (int) $stream; + + if (isset($this->writeListeners[$key])) { + unset($this->writeListeners[$key]); + $this->unsubscribeStreamEvent($stream, EV_WRITE); + } + } + + /** + * {@inheritdoc} + */ + public function removeStream($stream) + { + $key = (int) $stream; + + if (isset($this->streamEvents[$key])) { + $event = $this->streamEvents[$key]; + + event_del($event); + event_free($event); + + unset( + $this->streamFlags[$key], + $this->streamEvents[$key], + $this->readListeners[$key], + $this->writeListeners[$key] + ); + } + } + + /** + * {@inheritdoc} + */ + public function addTimer($interval, callable $callback) + { + $timer = new Timer($this, $interval, $callback, false); + + $this->scheduleTimer($timer); + + return $timer; + } + + /** + * {@inheritdoc} + */ + public function addPeriodicTimer($interval, callable $callback) + { + $timer = new Timer($this, $interval, $callback, true); + + $this->scheduleTimer($timer); + + return $timer; + } + + /** + * {@inheritdoc} + */ + public function cancelTimer(TimerInterface $timer) + { + if ($this->isTimerActive($timer)) { + $event = $this->timerEvents[$timer]; + + event_del($event); + event_free($event); + + $this->timerEvents->detach($timer); + } + } + + /** + * {@inheritdoc} + */ + public function isTimerActive(TimerInterface $timer) + { + return $this->timerEvents->contains($timer); + } + + /** + * {@inheritdoc} + */ + public function nextTick(callable $listener) + { + $this->nextTickQueue->add($listener); + } + + /** + * {@inheritdoc} + */ + public function futureTick(callable $listener) + { + $this->futureTickQueue->add($listener); + } + + /** + * {@inheritdoc} + */ + public function tick() + { + $this->nextTickQueue->tick(); + + $this->futureTickQueue->tick(); + + event_base_loop($this->eventBase, EVLOOP_ONCE | EVLOOP_NONBLOCK); + } + + /** + * {@inheritdoc} + */ + public function run() + { + $this->running = true; + + while ($this->running) { + $this->nextTickQueue->tick(); + + $this->futureTickQueue->tick(); + + $flags = EVLOOP_ONCE; + if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) { + $flags |= EVLOOP_NONBLOCK; + } elseif (!$this->streamEvents && !$this->timerEvents->count()) { + break; + } + + event_base_loop($this->eventBase, $flags); + } + } + + /** + * {@inheritdoc} + */ + public function stop() + { + $this->running = false; + } + + /** + * Schedule a timer for execution. + * + * @param TimerInterface $timer + */ + private function scheduleTimer(TimerInterface $timer) + { + $this->timerEvents[$timer] = $event = event_timer_new(); + + event_timer_set($event, $this->timerCallback, $timer); + event_base_set($event, $this->eventBase); + event_add($event, $timer->getInterval() * self::MICROSECONDS_PER_SECOND); + } + + /** + * Create a new ext-libevent event resource, or update the existing one. + * + * @param resource $stream + * @param integer $flag EV_READ or EV_WRITE + */ + private function subscribeStreamEvent($stream, $flag) + { + $key = (int) $stream; + + if (isset($this->streamEvents[$key])) { + $event = $this->streamEvents[$key]; + $flags = $this->streamFlags[$key] |= $flag; + + event_del($event); + event_set($event, $stream, EV_PERSIST | $flags, $this->streamCallback); + } else { + $event = event_new(); + + event_set($event, $stream, EV_PERSIST | $flag, $this->streamCallback); + event_base_set($event, $this->eventBase); + + $this->streamEvents[$key] = $event; + $this->streamFlags[$key] = $flag; + } + + event_add($event); + } + + /** + * Update the ext-libevent event resource for this stream to stop listening to + * the given event type, or remove it entirely if it's no longer needed. + * + * @param resource $stream + * @param integer $flag EV_READ or EV_WRITE + */ + private function unsubscribeStreamEvent($stream, $flag) + { + $key = (int) $stream; + + $flags = $this->streamFlags[$key] &= ~$flag; + + if (0 === $flags) { + $this->removeStream($stream); + + return; + } + + $event = $this->streamEvents[$key]; + + event_del($event); + event_set($event, $stream, EV_PERSIST | $flags, $this->streamCallback); + event_add($event); + } + + /** + * Create a callback used as the target of timer events. + * + * A reference is kept to the callback for the lifetime of the loop + * to prevent "Cannot destroy active lambda function" fatal error from + * the event extension. + */ + private function createTimerCallback() + { + $this->timerCallback = function ($_, $_, $timer) { + call_user_func($timer->getCallback(), $timer); + + // Timer already cancelled ... + if (!$this->isTimerActive($timer)) { + return; + + // Reschedule periodic timers ... + } elseif ($timer->isPeriodic()) { + event_add( + $this->timerEvents[$timer], + $timer->getInterval() * self::MICROSECONDS_PER_SECOND + ); + + // Clean-up one shot timers ... + } else { + $this->cancelTimer($timer); + } + }; + } + + /** + * Create a callback used as the target of stream events. + * + * A reference is kept to the callback for the lifetime of the loop + * to prevent "Cannot destroy active lambda function" fatal error from + * the event extension. + */ + private function createStreamCallback() + { + $this->streamCallback = function ($stream, $flags) { + $key = (int) $stream; + + if (EV_READ === (EV_READ & $flags) && isset($this->readListeners[$key])) { + call_user_func($this->readListeners[$key], $stream, $this); + } + + if (EV_WRITE === (EV_WRITE & $flags) && isset($this->writeListeners[$key])) { + call_user_func($this->writeListeners[$key], $stream, $this); + } + }; + } +} diff --git a/libs/React/EventLoop/LoopInterface.php b/libs/React/EventLoop/LoopInterface.php new file mode 100644 index 00000000..d046526c --- /dev/null +++ b/libs/React/EventLoop/LoopInterface.php @@ -0,0 +1,121 @@ +nextTickQueue = new NextTickQueue($this); + $this->futureTickQueue = new FutureTickQueue($this); + $this->timers = new Timers(); + } + + /** + * {@inheritdoc} + */ + public function addReadStream($stream, callable $listener) + { + $key = (int) $stream; + + if (!isset($this->readStreams[$key])) { + $this->readStreams[$key] = $stream; + $this->readListeners[$key] = $listener; + } + } + + /** + * {@inheritdoc} + */ + public function addWriteStream($stream, callable $listener) + { + $key = (int) $stream; + + if (!isset($this->writeStreams[$key])) { + $this->writeStreams[$key] = $stream; + $this->writeListeners[$key] = $listener; + } + } + + /** + * {@inheritdoc} + */ + public function removeReadStream($stream) + { + $key = (int) $stream; + + unset( + $this->readStreams[$key], + $this->readListeners[$key] + ); + } + + /** + * {@inheritdoc} + */ + public function removeWriteStream($stream) + { + $key = (int) $stream; + + unset( + $this->writeStreams[$key], + $this->writeListeners[$key] + ); + } + + /** + * {@inheritdoc} + */ + public function removeStream($stream) + { + $this->removeReadStream($stream); + $this->removeWriteStream($stream); + } + + /** + * {@inheritdoc} + */ + public function addTimer($interval, callable $callback) + { + $timer = new Timer($this, $interval, $callback, false); + + $this->timers->add($timer); + + return $timer; + } + + /** + * {@inheritdoc} + */ + public function addPeriodicTimer($interval, callable $callback) + { + $timer = new Timer($this, $interval, $callback, true); + + $this->timers->add($timer); + + return $timer; + } + + /** + * {@inheritdoc} + */ + public function cancelTimer(TimerInterface $timer) + { + $this->timers->cancel($timer); + } + + /** + * {@inheritdoc} + */ + public function isTimerActive(TimerInterface $timer) + { + return $this->timers->contains($timer); + } + + /** + * {@inheritdoc} + */ + public function nextTick(callable $listener) + { + $this->nextTickQueue->add($listener); + } + + /** + * {@inheritdoc} + */ + public function futureTick(callable $listener) + { + $this->futureTickQueue->add($listener); + } + + /** + * {@inheritdoc} + */ + public function tick() + { + $this->nextTickQueue->tick(); + + $this->futureTickQueue->tick(); + + $this->timers->tick(); + + $this->waitForStreamActivity(0); + } + + /** + * {@inheritdoc} + */ + public function run() + { + $this->running = true; + + while ($this->running) { + $this->nextTickQueue->tick(); + + $this->futureTickQueue->tick(); + + $this->timers->tick(); + + // Next-tick or future-tick queues have pending callbacks ... + if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) { + $timeout = 0; + + // There is a pending timer, only block until it is due ... + } elseif ($scheduledAt = $this->timers->getFirst()) { + $timeout = $scheduledAt - $this->timers->getTime(); + if ($timeout < 0) { + $timeout = 0; + } else { + $timeout *= self::MICROSECONDS_PER_SECOND; + } + + // The only possible event is stream activity, so wait forever ... + } elseif ($this->readStreams || $this->writeStreams) { + $timeout = null; + + // There's nothing left to do ... + } else { + break; + } + + $this->waitForStreamActivity($timeout); + } + } + + /** + * {@inheritdoc} + */ + public function stop() + { + $this->running = false; + } + + /** + * Wait/check for stream activity, or until the next timer is due. + */ + private function waitForStreamActivity($timeout) + { + $read = $this->readStreams; + $write = $this->writeStreams; + + $this->streamSelect($read, $write, $timeout); + + foreach ($read as $stream) { + $key = (int) $stream; + + if (isset($this->readListeners[$key])) { + call_user_func($this->readListeners[$key], $stream, $this); + } + } + + foreach ($write as $stream) { + $key = (int) $stream; + + if (isset($this->writeListeners[$key])) { + call_user_func($this->writeListeners[$key], $stream, $this); + } + } + } + + /** + * Emulate a stream_select() implementation that does not break when passed + * empty stream arrays. + * + * @param array &$read An array of read streams to select upon. + * @param array &$write An array of write streams to select upon. + * @param integer|null $timeout Activity timeout in microseconds, or null to wait forever. + * + * @return integer The total number of streams that are ready for read/write. + */ + protected function streamSelect(array &$read, array &$write, $timeout) + { + if ($read || $write) { + $except = null; + + return stream_select($read, $write, $except, $timeout === null ? null : 0, $timeout); + } + + usleep($timeout); + + return 0; + } +} diff --git a/libs/React/EventLoop/Tick/FutureTickQueue.php b/libs/React/EventLoop/Tick/FutureTickQueue.php new file mode 100644 index 00000000..eeffd363 --- /dev/null +++ b/libs/React/EventLoop/Tick/FutureTickQueue.php @@ -0,0 +1,59 @@ +eventLoop = $eventLoop; + $this->queue = new SplQueue(); + } + + /** + * Add a callback to be invoked on a future tick of the event loop. + * + * Callbacks are guaranteed to be executed in the order they are enqueued. + * + * @param callable $listener The callback to invoke. + */ + public function add(callable $listener) + { + $this->queue->enqueue($listener); + } + + /** + * Flush the callback queue. + */ + public function tick() + { + // Only invoke as many callbacks as were on the queue when tick() was called. + $count = $this->queue->count(); + + while ($count--) { + call_user_func( + $this->queue->dequeue(), + $this->eventLoop + ); + } + } + + /** + * Check if the next tick queue is empty. + * + * @return boolean + */ + public function isEmpty() + { + return $this->queue->isEmpty(); + } +} diff --git a/libs/React/EventLoop/Tick/NextTickQueue.php b/libs/React/EventLoop/Tick/NextTickQueue.php new file mode 100644 index 00000000..5b8e1de8 --- /dev/null +++ b/libs/React/EventLoop/Tick/NextTickQueue.php @@ -0,0 +1,57 @@ +eventLoop = $eventLoop; + $this->queue = new SplQueue(); + } + + /** + * Add a callback to be invoked on the next tick of the event loop. + * + * Callbacks are guaranteed to be executed in the order they are enqueued, + * before any timer or stream events. + * + * @param callable $listener The callback to invoke. + */ + public function add(callable $listener) + { + $this->queue->enqueue($listener); + } + + /** + * Flush the callback queue. + */ + public function tick() + { + while (!$this->queue->isEmpty()) { + call_user_func( + $this->queue->dequeue(), + $this->eventLoop + ); + } + } + + /** + * Check if the next tick queue is empty. + * + * @return boolean + */ + public function isEmpty() + { + return $this->queue->isEmpty(); + } +} diff --git a/libs/React/EventLoop/Timer/Timer.php b/libs/React/EventLoop/Timer/Timer.php new file mode 100644 index 00000000..f670ab3c --- /dev/null +++ b/libs/React/EventLoop/Timer/Timer.php @@ -0,0 +1,102 @@ +loop = $loop; + $this->interval = (float) $interval; + $this->callback = $callback; + $this->periodic = (bool) $periodic; + $this->data = null; + } + + /** + * {@inheritdoc} + */ + public function getLoop() + { + return $this->loop; + } + + /** + * {@inheritdoc} + */ + public function getInterval() + { + return $this->interval; + } + + /** + * {@inheritdoc} + */ + public function getCallback() + { + return $this->callback; + } + + /** + * {@inheritdoc} + */ + public function setData($data) + { + $this->data = $data; + } + + /** + * {@inheritdoc} + */ + public function getData() + { + return $this->data; + } + + /** + * {@inheritdoc} + */ + public function isPeriodic() + { + return $this->periodic; + } + + /** + * {@inheritdoc} + */ + public function isActive() + { + return $this->loop->isTimerActive($this); + } + + /** + * {@inheritdoc} + */ + public function cancel() + { + $this->loop->cancelTimer($this); + } +} diff --git a/libs/React/EventLoop/Timer/TimerInterface.php b/libs/React/EventLoop/Timer/TimerInterface.php new file mode 100644 index 00000000..d066f369 --- /dev/null +++ b/libs/React/EventLoop/Timer/TimerInterface.php @@ -0,0 +1,62 @@ +timers = new SplObjectStorage(); + $this->scheduler = new SplPriorityQueue(); + } + + public function updateTime() + { + return $this->time = microtime(true); + } + + public function getTime() + { + return $this->time ?: $this->updateTime(); + } + + public function add(TimerInterface $timer) + { + $interval = $timer->getInterval(); + $scheduledAt = $interval + $this->getTime(); + + $this->timers->attach($timer, $scheduledAt); + $this->scheduler->insert($timer, -$scheduledAt); + } + + public function contains(TimerInterface $timer) + { + return $this->timers->contains($timer); + } + + public function cancel(TimerInterface $timer) + { + $this->timers->detach($timer); + } + + public function getFirst() + { + while ($this->scheduler->count()) { + $timer = $this->scheduler->top(); + + if ($this->timers->contains($timer)) { + return $this->timers[$timer]; + } + + $this->scheduler->extract(); + } + + return null; + } + + public function isEmpty() + { + return count($this->timers) === 0; + } + + public function tick() + { + $time = $this->updateTime(); + $timers = $this->timers; + $scheduler = $this->scheduler; + + while (!$scheduler->isEmpty()) { + $timer = $scheduler->top(); + + if (!isset($timers[$timer])) { + $scheduler->extract(); + $timers->detach($timer); + + continue; + } + + if ($timers[$timer] >= $time) { + break; + } + + $scheduler->extract(); + call_user_func($timer->getCallback(), $timer); + + if ($timer->isPeriodic() && isset($timers[$timer])) { + $timers[$timer] = $scheduledAt = $timer->getInterval() + $time; + $scheduler->insert($timer, -$scheduledAt); + } else { + $timers->detach($timer); + } + } + } +} diff --git a/libs/React/Socket/Connection.php b/libs/React/Socket/Connection.php new file mode 100644 index 00000000..2bdf2180 --- /dev/null +++ b/libs/React/Socket/Connection.php @@ -0,0 +1,42 @@ +bufferSize); + if ('' !== $data && false !== $data) { + $this->emit('data', array($data, $this)); + } + + if ('' === $data || false === $data || !is_resource($stream) || feof($stream)) { + $this->end(); + } + } + + public function handleClose() + { + if (is_resource($this->stream)) { + // http://chat.stackoverflow.com/transcript/message/7727858#7727858 + stream_socket_shutdown($this->stream, STREAM_SHUT_RDWR); + stream_set_blocking($this->stream, false); + fclose($this->stream); + } + } + + public function getRemoteAddress() + { + return $this->parseAddress(stream_socket_get_name($this->stream, true)); + } + + private function parseAddress($address) + { + return trim(substr($address, 0, strrpos($address, ':')), '[]'); + } +} diff --git a/libs/React/Socket/ConnectionException.php b/libs/React/Socket/ConnectionException.php new file mode 100644 index 00000000..72b10280 --- /dev/null +++ b/libs/React/Socket/ConnectionException.php @@ -0,0 +1,7 @@ +loop = $loop; + } + + public function listen($port, $host = '127.0.0.1') + { + if (strpos($host, ':') !== false) { + // enclose IPv6 addresses in square brackets before appending port + $host = '[' . $host . ']'; + } + + $this->master = @stream_socket_server("tcp://$host:$port", $errno, $errstr); + if (false === $this->master) { + $message = "Could not bind to tcp://$host:$port: $errstr"; + throw new ConnectionException($message, $errno); + } + stream_set_blocking($this->master, 0); + + $this->loop->addReadStream($this->master, function ($master) { + $newSocket = stream_socket_accept($master); + if (false === $newSocket) { + $this->emit('error', array(new \RuntimeException('Error accepting new connection'))); + + return; + } + $this->handleConnection($newSocket); + }); + } + + public function handleConnection($socket) + { + stream_set_blocking($socket, 0); + + $client = $this->createConnection($socket); + + $this->emit('connection', array($client)); + } + + public function getPort() + { + $name = stream_socket_get_name($this->master, false); + + return (int) substr(strrchr($name, ':'), 1); + } + + public function shutdown() + { + $this->loop->removeStream($this->master); + fclose($this->master); + $this->removeAllListeners(); + } + + public function createConnection($socket) + { + return new Connection($socket, $this->loop); + } +} diff --git a/libs/React/Socket/ServerInterface.php b/libs/React/Socket/ServerInterface.php new file mode 100644 index 00000000..3665a165 --- /dev/null +++ b/libs/React/Socket/ServerInterface.php @@ -0,0 +1,13 @@ + 0, + 'message' => '', + 'file' => '', + 'line' => 0, + ); + + public function __construct($stream, LoopInterface $loop) + { + $this->stream = $stream; + $this->loop = $loop; + } + + public function isWritable() + { + return $this->writable; + } + + public function write($data) + { + if (!$this->writable) { + return; + } + + $this->data .= $data; + + if (!$this->listening) { + $this->listening = true; + + $this->loop->addWriteStream($this->stream, array($this, 'handleWrite')); + } + + $belowSoftLimit = strlen($this->data) < $this->softLimit; + + return $belowSoftLimit; + } + + public function end($data = null) + { + if (null !== $data) { + $this->write($data); + } + + $this->writable = false; + + if ($this->listening) { + $this->on('full-drain', array($this, 'close')); + } else { + $this->close(); + } + } + + public function close() + { + $this->writable = false; + $this->listening = false; + $this->data = ''; + + $this->emit('close', [$this]); + } + + public function handleWrite() + { + if (!is_resource($this->stream)) { + $this->emit('error', array(new \RuntimeException('Tried to write to invalid stream.'), $this)); + + return; + } + + set_error_handler(array($this, 'errorHandler')); + + $sent = fwrite($this->stream, $this->data); + + restore_error_handler(); + + if (false === $sent) { + $this->emit('error', array( + new \ErrorException( + $this->lastError['message'], + 0, + $this->lastError['number'], + $this->lastError['file'], + $this->lastError['line'] + ), + $this + )); + + return; + } + + if (0 === $sent && feof($this->stream)) { + $this->emit('error', array(new \RuntimeException('Tried to write to closed stream.'), $this)); + + return; + } + + $len = strlen($this->data); + if ($len >= $this->softLimit && $len - $sent < $this->softLimit) { + $this->emit('drain', [$this]); + } + + $this->data = (string) substr($this->data, $sent); + + if (0 === strlen($this->data)) { + $this->loop->removeWriteStream($this->stream); + $this->listening = false; + + $this->emit('full-drain', [$this]); + } + } + + private function errorHandler($errno, $errstr, $errfile, $errline) + { + $this->lastError['number'] = $errno; + $this->lastError['message'] = $errstr; + $this->lastError['file'] = $errfile; + $this->lastError['line'] = $errline; + } +} diff --git a/libs/React/Stream/BufferedSink.php b/libs/React/Stream/BufferedSink.php new file mode 100644 index 00000000..a6b35c01 --- /dev/null +++ b/libs/React/Stream/BufferedSink.php @@ -0,0 +1,59 @@ +deferred = new Deferred(); + + $this->on('pipe', array($this, 'handlePipeEvent')); + $this->on('error', array($this, 'handleErrorEvent')); + } + + public function handlePipeEvent($source) + { + Util::forwardEvents($source, $this, array('error')); + } + + public function handleErrorEvent($e) + { + $this->deferred->reject($e); + } + + public function write($data) + { + $this->buffer .= $data; + $this->deferred->progress($data); + } + + public function close() + { + if ($this->closed) { + return; + } + + parent::close(); + $this->deferred->resolve($this->buffer); + } + + public function promise() + { + return $this->deferred->promise(); + } + + public static function createPromise(ReadableStreamInterface $stream) + { + $sink = new static(); + $stream->pipe($sink); + + return $sink->promise(); + } +} diff --git a/libs/React/Stream/CompositeStream.php b/libs/React/Stream/CompositeStream.php new file mode 100644 index 00000000..b2f88b0a --- /dev/null +++ b/libs/React/Stream/CompositeStream.php @@ -0,0 +1,84 @@ +readable = $readable; + $this->writable = $writable; + + Util::forwardEvents($this->readable, $this, array('data', 'end', 'error', 'close')); + Util::forwardEvents($this->writable, $this, array('drain', 'error', 'close', 'pipe')); + + $this->readable->on('close', array($this, 'close')); + $this->writable->on('close', array($this, 'close')); + + $this->on('pipe', array($this, 'handlePipeEvent')); + } + + public function handlePipeEvent($source) + { + $this->pipeSource = $source; + } + + public function isReadable() + { + return $this->readable->isReadable(); + } + + public function pause() + { + if ($this->pipeSource) { + $this->pipeSource->pause(); + } + + $this->readable->pause(); + } + + public function resume() + { + if ($this->pipeSource) { + $this->pipeSource->resume(); + } + + $this->readable->resume(); + } + + public function pipe(WritableStreamInterface $dest, array $options = array()) + { + Util::pipe($this, $dest, $options); + + return $dest; + } + + public function isWritable() + { + return $this->writable->isWritable(); + } + + public function write($data) + { + return $this->writable->write($data); + } + + public function end($data = null) + { + $this->writable->end($data); + } + + public function close() + { + $this->pipeSource = null; + + $this->readable->close(); + $this->writable->close(); + } +} diff --git a/libs/React/Stream/DuplexStreamInterface.php b/libs/React/Stream/DuplexStreamInterface.php new file mode 100644 index 00000000..bd370d01 --- /dev/null +++ b/libs/React/Stream/DuplexStreamInterface.php @@ -0,0 +1,7 @@ +closed; + } + + public function pause() + { + } + + public function resume() + { + } + + public function pipe(WritableStreamInterface $dest, array $options = array()) + { + Util::pipe($this, $dest, $options); + + return $dest; + } + + public function close() + { + if ($this->closed) { + return; + } + + $this->closed = true; + $this->emit('end', array($this)); + $this->emit('close', array($this)); + $this->removeAllListeners(); + } +} diff --git a/libs/React/Stream/ReadableStreamInterface.php b/libs/React/Stream/ReadableStreamInterface.php new file mode 100644 index 00000000..312fc47e --- /dev/null +++ b/libs/React/Stream/ReadableStreamInterface.php @@ -0,0 +1,20 @@ +stream = $stream; + if (!is_resource($this->stream) || get_resource_type($this->stream) !== "stream") { + throw new InvalidArgumentException('First parameter must be a valid stream resource'); + } + + stream_set_blocking($this->stream, 0); + + $this->loop = $loop; + $this->buffer = new Buffer($this->stream, $this->loop); + + $this->buffer->on('error', function ($error) { + $this->emit('error', array($error, $this)); + $this->close(); + }); + + $this->buffer->on('drain', function () { + $this->emit('drain', array($this)); + }); + + $this->resume(); + } + + public function isReadable() + { + return $this->readable; + } + + public function isWritable() + { + return $this->writable; + } + + public function pause() + { + $this->loop->removeReadStream($this->stream); + } + + public function resume() + { + if ($this->readable) { + $this->loop->addReadStream($this->stream, array($this, 'handleData')); + } + } + + public function write($data) + { + if (!$this->writable) { + return; + } + + return $this->buffer->write($data); + } + + public function close() + { + if (!$this->writable && !$this->closing) { + return; + } + + $this->closing = false; + + $this->readable = false; + $this->writable = false; + + $this->emit('end', array($this)); + $this->emit('close', array($this)); + $this->loop->removeStream($this->stream); + $this->buffer->removeAllListeners(); + $this->removeAllListeners(); + + $this->handleClose(); + } + + public function end($data = null) + { + if (!$this->writable) { + return; + } + + $this->closing = true; + + $this->readable = false; + $this->writable = false; + + $this->buffer->on('close', function () { + $this->close(); + }); + + $this->buffer->end($data); + } + + public function pipe(WritableStreamInterface $dest, array $options = array()) + { + Util::pipe($this, $dest, $options); + + return $dest; + } + + public function handleData($stream) + { + $data = fread($stream, $this->bufferSize); + + $this->emit('data', array($data, $this)); + + if (!is_resource($stream) || feof($stream)) { + $this->end(); + } + } + + public function handleClose() + { + if (is_resource($this->stream)) { + fclose($this->stream); + } + } + + public function getBuffer() + { + return $this->buffer; + } +} diff --git a/libs/React/Stream/ThroughStream.php b/libs/React/Stream/ThroughStream.php new file mode 100644 index 00000000..a6a47495 --- /dev/null +++ b/libs/React/Stream/ThroughStream.php @@ -0,0 +1,33 @@ +readable->emit('data', array($this->filter($data), $this)); + } + + public function end($data = null) + { + if (null !== $data) { + $this->readable->emit('data', array($this->filter($data), $this)); + } + + $this->writable->end($data); + } +} diff --git a/libs/React/Stream/Util.php b/libs/React/Stream/Util.php new file mode 100644 index 00000000..c2445a63 --- /dev/null +++ b/libs/React/Stream/Util.php @@ -0,0 +1,45 @@ +emit('pipe', array($source)); + + $source->on('data', function ($data) use ($source, $dest) { + $feedMore = $dest->write($data); + + if (false === $feedMore) { + $source->pause(); + } + }); + + $dest->on('drain', function () use ($source) { + $source->resume(); + }); + + $end = isset($options['end']) ? $options['end'] : true; + if ($end && $source !== $dest) { + $source->on('end', function () use ($dest) { + $dest->end(); + }); + } + } + + public static function forwardEvents($source, $target, array $events) + { + foreach ($events as $event) { + $source->on($event, function () use ($event, $target) { + $target->emit($event, func_get_args()); + }); + } + } +} diff --git a/libs/React/Stream/WritableStream.php b/libs/React/Stream/WritableStream.php new file mode 100644 index 00000000..9118bdb1 --- /dev/null +++ b/libs/React/Stream/WritableStream.php @@ -0,0 +1,40 @@ +write($data); + } + + $this->close(); + } + + public function isWritable() + { + return !$this->closed; + } + + public function close() + { + if ($this->closed) { + return; + } + + $this->closed = true; + $this->emit('end', array($this)); + $this->emit('close', array($this)); + $this->removeAllListeners(); + } +} diff --git a/libs/React/Stream/WritableStreamInterface.php b/libs/React/Stream/WritableStreamInterface.php new file mode 100644 index 00000000..84b93ee6 --- /dev/null +++ b/libs/React/Stream/WritableStreamInterface.php @@ -0,0 +1,19 @@ +