328 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
			
		
		
	
	
			328 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
| <?php
 | |
| 
 | |
| namespace React\EventLoop;
 | |
| 
 | |
| use Event;
 | |
| use EventBase;
 | |
| use EventConfig as EventBaseConfig;
 | |
| use React\EventLoop\Tick\FutureTickQueue;
 | |
| use React\EventLoop\Tick\NextTickQueue;
 | |
| use React\EventLoop\Timer\Timer;
 | |
| use React\EventLoop\Timer\TimerInterface;
 | |
| use SplObjectStorage;
 | |
| 
 | |
| /**
 | |
|  * An ext-event based React.
 | |
|  */
 | |
| class ExtEventLoop implements LoopInterface
 | |
| {
 | |
|     private $eventBase;
 | |
|     private $nextTickQueue;
 | |
|     private $futureTickQueue;
 | |
|     private $timerCallback;
 | |
|     private $timerEvents;
 | |
|     private $streamCallback;
 | |
|     private $streamEvents = [];
 | |
|     private $streamFlags = [];
 | |
|     private $readListeners = [];
 | |
|     private $writeListeners = [];
 | |
|     private $running;
 | |
| 
 | |
|     public function __construct(?EventBaseConfig $config = null)
 | |
|     {
 | |
|         $this->eventBase = new EventBase($config);
 | |
|         $this->nextTickQueue = new NextTickQueue($this);
 | |
|         $this->futureTickQueue = new FutureTickQueue($this);
 | |
|         $this->timerEvents = new SplObjectStorage();
 | |
| 
 | |
|         $this->createTimerCallback();
 | |
|         $this->createStreamCallback();
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * {@inheritdoc}
 | |
|      */
 | |
|     public function addReadStream($stream, callable $listener)
 | |
|     {
 | |
|         $key = (int) $stream;
 | |
| 
 | |
|         if (!isset($this->readListeners[$key])) {
 | |
|             $this->readListeners[$key] = $listener;
 | |
|             $this->subscribeStreamEvent($stream, Event::READ);
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * {@inheritdoc}
 | |
|      */
 | |
|     public function addWriteStream($stream, callable $listener)
 | |
|     {
 | |
|         $key = (int) $stream;
 | |
| 
 | |
|         if (!isset($this->writeListeners[$key])) {
 | |
|             $this->writeListeners[$key] = $listener;
 | |
|             $this->subscribeStreamEvent($stream, Event::WRITE);
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * {@inheritdoc}
 | |
|      */
 | |
|     public function removeReadStream($stream)
 | |
|     {
 | |
|         $key = (int) $stream;
 | |
| 
 | |
|         if (isset($this->readListeners[$key])) {
 | |
|             unset($this->readListeners[$key]);
 | |
|             $this->unsubscribeStreamEvent($stream, Event::READ);
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * {@inheritdoc}
 | |
|      */
 | |
|     public function removeWriteStream($stream)
 | |
|     {
 | |
|         $key = (int) $stream;
 | |
| 
 | |
|         if (isset($this->writeListeners[$key])) {
 | |
|             unset($this->writeListeners[$key]);
 | |
|             $this->unsubscribeStreamEvent($stream, Event::WRITE);
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * {@inheritdoc}
 | |
|      */
 | |
|     public function removeStream($stream)
 | |
|     {
 | |
|         $key = (int) $stream;
 | |
| 
 | |
|         if (isset($this->streamEvents[$key])) {
 | |
|             $this->streamEvents[$key]->free();
 | |
| 
 | |
|             unset(
 | |
|                 $this->streamFlags[$key],
 | |
|                 $this->streamEvents[$key],
 | |
|                 $this->readListeners[$key],
 | |
|                 $this->writeListeners[$key]
 | |
|             );
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * {@inheritdoc}
 | |
|      */
 | |
|     public function addTimer($interval, callable $callback)
 | |
|     {
 | |
|         $timer = new Timer($this, $interval, $callback, false);
 | |
| 
 | |
|         $this->scheduleTimer($timer);
 | |
| 
 | |
|         return $timer;
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * {@inheritdoc}
 | |
|      */
 | |
|     public function addPeriodicTimer($interval, callable $callback)
 | |
|     {
 | |
|         $timer = new Timer($this, $interval, $callback, true);
 | |
| 
 | |
|         $this->scheduleTimer($timer);
 | |
| 
 | |
|         return $timer;
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * {@inheritdoc}
 | |
|      */
 | |
|     public function cancelTimer(TimerInterface $timer)
 | |
|     {
 | |
|         if ($this->isTimerActive($timer)) {
 | |
|             $this->timerEvents[$timer]->free();
 | |
|             $this->timerEvents->detach($timer);
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * {@inheritdoc}
 | |
|      */
 | |
|     public function isTimerActive(TimerInterface $timer)
 | |
|     {
 | |
|         return $this->timerEvents->contains($timer);
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * {@inheritdoc}
 | |
|      */
 | |
|     public function nextTick(callable $listener)
 | |
|     {
 | |
|         $this->nextTickQueue->add($listener);
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * {@inheritdoc}
 | |
|      */
 | |
|     public function futureTick(callable $listener)
 | |
|     {
 | |
|         $this->futureTickQueue->add($listener);
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * {@inheritdoc}
 | |
|      */
 | |
|     public function tick()
 | |
|     {
 | |
|         $this->nextTickQueue->tick();
 | |
| 
 | |
|         $this->futureTickQueue->tick();
 | |
| 
 | |
|         // @-suppression: https://github.com/reactphp/react/pull/234#discussion-diff-7759616R226
 | |
|         @$this->eventBase->loop(EventBase::LOOP_ONCE | EventBase::LOOP_NONBLOCK);
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * {@inheritdoc}
 | |
|      */
 | |
|     public function run()
 | |
|     {
 | |
|         $this->running = true;
 | |
| 
 | |
|         while ($this->running) {
 | |
|             $this->nextTickQueue->tick();
 | |
| 
 | |
|             $this->futureTickQueue->tick();
 | |
| 
 | |
|             $flags = EventBase::LOOP_ONCE;
 | |
|             if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) {
 | |
|                 $flags |= EventBase::LOOP_NONBLOCK;
 | |
|             } elseif (!$this->streamEvents && !$this->timerEvents->count()) {
 | |
|                 break;
 | |
|             }
 | |
| 
 | |
|             // @-suppression: https://github.com/reactphp/react/pull/234#discussion-diff-7759616R226
 | |
|             @$this->eventBase->loop($flags);
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * {@inheritdoc}
 | |
|      */
 | |
|     public function stop()
 | |
|     {
 | |
|         $this->running = false;
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * Schedule a timer for execution.
 | |
|      *
 | |
|      * @param TimerInterface $timer
 | |
|      */
 | |
|     private function scheduleTimer(TimerInterface $timer)
 | |
|     {
 | |
|         $flags = Event::TIMEOUT;
 | |
| 
 | |
|         if ($timer->isPeriodic()) {
 | |
|             $flags |= Event::PERSIST;
 | |
|         }
 | |
| 
 | |
|         $event = new Event($this->eventBase, -1, $flags, $this->timerCallback, $timer);
 | |
|         $this->timerEvents[$timer] = $event;
 | |
| 
 | |
|         $event->add($timer->getInterval());
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * Create a new ext-event Event object, or update the existing one.
 | |
|      *
 | |
|      * @param resource $stream
 | |
|      * @param integer  $flag   Event::READ or Event::WRITE
 | |
|      */
 | |
|     private function subscribeStreamEvent($stream, $flag)
 | |
|     {
 | |
|         $key = (int) $stream;
 | |
| 
 | |
|         if (isset($this->streamEvents[$key])) {
 | |
|             $event = $this->streamEvents[$key];
 | |
|             $flags = ($this->streamFlags[$key] |= $flag);
 | |
| 
 | |
|             $event->del();
 | |
|             $event->set($this->eventBase, $stream, Event::PERSIST | $flags, $this->streamCallback);
 | |
|         } else {
 | |
|             $event = new Event($this->eventBase, $stream, Event::PERSIST | $flag, $this->streamCallback);
 | |
| 
 | |
|             $this->streamEvents[$key] = $event;
 | |
|             $this->streamFlags[$key] = $flag;
 | |
|         }
 | |
| 
 | |
|         $event->add();
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * Update the ext-event Event object for this stream to stop listening to
 | |
|      * the given event type, or remove it entirely if it's no longer needed.
 | |
|      *
 | |
|      * @param resource $stream
 | |
|      * @param integer  $flag   Event::READ or Event::WRITE
 | |
|      */
 | |
|     private function unsubscribeStreamEvent($stream, $flag)
 | |
|     {
 | |
|         $key = (int) $stream;
 | |
| 
 | |
|         $flags = $this->streamFlags[$key] &= ~$flag;
 | |
| 
 | |
|         if (0 === $flags) {
 | |
|             $this->removeStream($stream);
 | |
| 
 | |
|             return;
 | |
|         }
 | |
| 
 | |
|         $event = $this->streamEvents[$key];
 | |
| 
 | |
|         $event->del();
 | |
|         $event->set($this->eventBase, $stream, Event::PERSIST | $flags, $this->streamCallback);
 | |
|         $event->add();
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * Create a callback used as the target of timer events.
 | |
|      *
 | |
|      * A reference is kept to the callback for the lifetime of the loop
 | |
|      * to prevent "Cannot destroy active lambda function" fatal error from
 | |
|      * the event extension.
 | |
|      */
 | |
|     private function createTimerCallback()
 | |
|     {
 | |
|         $this->timerCallback = function ($_, $_, $timer) {
 | |
|             call_user_func($timer->getCallback(), $timer);
 | |
| 
 | |
|             if (!$timer->isPeriodic() && $this->isTimerActive($timer)) {
 | |
|                 $this->cancelTimer($timer);
 | |
|             }
 | |
|         };
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * Create a callback used as the target of stream events.
 | |
|      *
 | |
|      * A reference is kept to the callback for the lifetime of the loop
 | |
|      * to prevent "Cannot destroy active lambda function" fatal error from
 | |
|      * the event extension.
 | |
|      */
 | |
|     private function createStreamCallback()
 | |
|     {
 | |
|         $this->streamCallback = function ($stream, $flags) {
 | |
|             $key = (int) $stream;
 | |
| 
 | |
|             if (Event::READ === (Event::READ & $flags) && isset($this->readListeners[$key])) {
 | |
|                 call_user_func($this->readListeners[$key], $stream, $this);
 | |
|             }
 | |
| 
 | |
|             if (Event::WRITE === (Event::WRITE & $flags) && isset($this->writeListeners[$key])) {
 | |
|                 call_user_func($this->writeListeners[$key], $stream, $this);
 | |
|             }
 | |
|         };
 | |
|     }
 | |
| }
 |