344 lines
8.5 KiB
PHP
344 lines
8.5 KiB
PHP
<?php
|
|
|
|
namespace React\EventLoop;
|
|
|
|
use Event;
|
|
use EventBase;
|
|
use React\EventLoop\Tick\FutureTickQueue;
|
|
use React\EventLoop\Tick\NextTickQueue;
|
|
use React\EventLoop\Timer\Timer;
|
|
use React\EventLoop\Timer\TimerInterface;
|
|
use SplObjectStorage;
|
|
|
|
/**
|
|
* An ext-libevent based React.
|
|
*/
|
|
class LibEventLoop implements LoopInterface
|
|
{
|
|
const MICROSECONDS_PER_SECOND = 1000000;
|
|
|
|
private $eventBase;
|
|
private $nextTickQueue;
|
|
private $futureTickQueue;
|
|
private $timerCallback;
|
|
private $timerEvents;
|
|
private $streamCallback;
|
|
private $streamEvents = [];
|
|
private $streamFlags = [];
|
|
private $readListeners = [];
|
|
private $writeListeners = [];
|
|
private $running;
|
|
|
|
public function __construct()
|
|
{
|
|
$this->eventBase = event_base_new();
|
|
$this->nextTickQueue = new NextTickQueue($this);
|
|
$this->futureTickQueue = new FutureTickQueue($this);
|
|
$this->timerEvents = new SplObjectStorage();
|
|
|
|
$this->createTimerCallback();
|
|
$this->createStreamCallback();
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function addReadStream($stream, callable $listener)
|
|
{
|
|
$key = (int) $stream;
|
|
|
|
if (!isset($this->readListeners[$key])) {
|
|
$this->readListeners[$key] = $listener;
|
|
$this->subscribeStreamEvent($stream, EV_READ);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function addWriteStream($stream, callable $listener)
|
|
{
|
|
$key = (int) $stream;
|
|
|
|
if (!isset($this->writeListeners[$key])) {
|
|
$this->writeListeners[$key] = $listener;
|
|
$this->subscribeStreamEvent($stream, EV_WRITE);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function removeReadStream($stream)
|
|
{
|
|
$key = (int) $stream;
|
|
|
|
if (isset($this->readListeners[$key])) {
|
|
unset($this->readListeners[$key]);
|
|
$this->unsubscribeStreamEvent($stream, EV_READ);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function removeWriteStream($stream)
|
|
{
|
|
$key = (int) $stream;
|
|
|
|
if (isset($this->writeListeners[$key])) {
|
|
unset($this->writeListeners[$key]);
|
|
$this->unsubscribeStreamEvent($stream, EV_WRITE);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function removeStream($stream)
|
|
{
|
|
$key = (int) $stream;
|
|
|
|
if (isset($this->streamEvents[$key])) {
|
|
$event = $this->streamEvents[$key];
|
|
|
|
event_del($event);
|
|
event_free($event);
|
|
|
|
unset(
|
|
$this->streamFlags[$key],
|
|
$this->streamEvents[$key],
|
|
$this->readListeners[$key],
|
|
$this->writeListeners[$key]
|
|
);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function addTimer($interval, callable $callback)
|
|
{
|
|
$timer = new Timer($this, $interval, $callback, false);
|
|
|
|
$this->scheduleTimer($timer);
|
|
|
|
return $timer;
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function addPeriodicTimer($interval, callable $callback)
|
|
{
|
|
$timer = new Timer($this, $interval, $callback, true);
|
|
|
|
$this->scheduleTimer($timer);
|
|
|
|
return $timer;
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function cancelTimer(TimerInterface $timer)
|
|
{
|
|
if ($this->isTimerActive($timer)) {
|
|
$event = $this->timerEvents[$timer];
|
|
|
|
event_del($event);
|
|
event_free($event);
|
|
|
|
$this->timerEvents->detach($timer);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function isTimerActive(TimerInterface $timer)
|
|
{
|
|
return $this->timerEvents->contains($timer);
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function nextTick(callable $listener)
|
|
{
|
|
$this->nextTickQueue->add($listener);
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function futureTick(callable $listener)
|
|
{
|
|
$this->futureTickQueue->add($listener);
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function tick()
|
|
{
|
|
$this->nextTickQueue->tick();
|
|
|
|
$this->futureTickQueue->tick();
|
|
|
|
event_base_loop($this->eventBase, EVLOOP_ONCE | EVLOOP_NONBLOCK);
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function run()
|
|
{
|
|
$this->running = true;
|
|
|
|
while ($this->running) {
|
|
$this->nextTickQueue->tick();
|
|
|
|
$this->futureTickQueue->tick();
|
|
|
|
$flags = EVLOOP_ONCE;
|
|
if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) {
|
|
$flags |= EVLOOP_NONBLOCK;
|
|
} elseif (!$this->streamEvents && !$this->timerEvents->count()) {
|
|
break;
|
|
}
|
|
|
|
event_base_loop($this->eventBase, $flags);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function stop()
|
|
{
|
|
$this->running = false;
|
|
}
|
|
|
|
/**
|
|
* Schedule a timer for execution.
|
|
*
|
|
* @param TimerInterface $timer
|
|
*/
|
|
private function scheduleTimer(TimerInterface $timer)
|
|
{
|
|
$this->timerEvents[$timer] = $event = event_timer_new();
|
|
|
|
event_timer_set($event, $this->timerCallback, $timer);
|
|
event_base_set($event, $this->eventBase);
|
|
event_add($event, $timer->getInterval() * self::MICROSECONDS_PER_SECOND);
|
|
}
|
|
|
|
/**
|
|
* Create a new ext-libevent event resource, or update the existing one.
|
|
*
|
|
* @param resource $stream
|
|
* @param integer $flag EV_READ or EV_WRITE
|
|
*/
|
|
private function subscribeStreamEvent($stream, $flag)
|
|
{
|
|
$key = (int) $stream;
|
|
|
|
if (isset($this->streamEvents[$key])) {
|
|
$event = $this->streamEvents[$key];
|
|
$flags = $this->streamFlags[$key] |= $flag;
|
|
|
|
event_del($event);
|
|
event_set($event, $stream, EV_PERSIST | $flags, $this->streamCallback);
|
|
} else {
|
|
$event = event_new();
|
|
|
|
event_set($event, $stream, EV_PERSIST | $flag, $this->streamCallback);
|
|
event_base_set($event, $this->eventBase);
|
|
|
|
$this->streamEvents[$key] = $event;
|
|
$this->streamFlags[$key] = $flag;
|
|
}
|
|
|
|
event_add($event);
|
|
}
|
|
|
|
/**
|
|
* Update the ext-libevent event resource for this stream to stop listening to
|
|
* the given event type, or remove it entirely if it's no longer needed.
|
|
*
|
|
* @param resource $stream
|
|
* @param integer $flag EV_READ or EV_WRITE
|
|
*/
|
|
private function unsubscribeStreamEvent($stream, $flag)
|
|
{
|
|
$key = (int) $stream;
|
|
|
|
$flags = $this->streamFlags[$key] &= ~$flag;
|
|
|
|
if (0 === $flags) {
|
|
$this->removeStream($stream);
|
|
|
|
return;
|
|
}
|
|
|
|
$event = $this->streamEvents[$key];
|
|
|
|
event_del($event);
|
|
event_set($event, $stream, EV_PERSIST | $flags, $this->streamCallback);
|
|
event_add($event);
|
|
}
|
|
|
|
/**
|
|
* Create a callback used as the target of timer events.
|
|
*
|
|
* A reference is kept to the callback for the lifetime of the loop
|
|
* to prevent "Cannot destroy active lambda function" fatal error from
|
|
* the event extension.
|
|
*/
|
|
private function createTimerCallback()
|
|
{
|
|
$this->timerCallback = function ($_, $_, $timer) {
|
|
call_user_func($timer->getCallback(), $timer);
|
|
|
|
// Timer already cancelled ...
|
|
if (!$this->isTimerActive($timer)) {
|
|
return;
|
|
|
|
// Reschedule periodic timers ...
|
|
} elseif ($timer->isPeriodic()) {
|
|
event_add(
|
|
$this->timerEvents[$timer],
|
|
$timer->getInterval() * self::MICROSECONDS_PER_SECOND
|
|
);
|
|
|
|
// Clean-up one shot timers ...
|
|
} else {
|
|
$this->cancelTimer($timer);
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Create a callback used as the target of stream events.
|
|
*
|
|
* A reference is kept to the callback for the lifetime of the loop
|
|
* to prevent "Cannot destroy active lambda function" fatal error from
|
|
* the event extension.
|
|
*/
|
|
private function createStreamCallback()
|
|
{
|
|
$this->streamCallback = function ($stream, $flags) {
|
|
$key = (int) $stream;
|
|
|
|
if (EV_READ === (EV_READ & $flags) && isset($this->readListeners[$key])) {
|
|
call_user_func($this->readListeners[$key], $stream, $this);
|
|
}
|
|
|
|
if (EV_WRITE === (EV_WRITE & $flags) && isset($this->writeListeners[$key])) {
|
|
call_user_func($this->writeListeners[$key], $stream, $this);
|
|
}
|
|
};
|
|
}
|
|
}
|