263 lines
		
	
	
		
			6.0 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
			
		
		
	
	
			263 lines
		
	
	
		
			6.0 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
<?php
 | 
						|
 | 
						|
namespace React\EventLoop;
 | 
						|
 | 
						|
use React\EventLoop\Tick\FutureTickQueue;
 | 
						|
use React\EventLoop\Tick\NextTickQueue;
 | 
						|
use React\EventLoop\Timer\Timer;
 | 
						|
use React\EventLoop\Timer\TimerInterface;
 | 
						|
use React\EventLoop\Timer\Timers;
 | 
						|
 | 
						|
/**
 | 
						|
 * A stream_select() based React.
 | 
						|
 */
 | 
						|
class StreamSelectLoop implements LoopInterface
 | 
						|
{
 | 
						|
    const MICROSECONDS_PER_SECOND = 1000000;
 | 
						|
 | 
						|
    private $nextTickQueue;
 | 
						|
    private $futureTickQueue;
 | 
						|
    private $timers;
 | 
						|
    private $readStreams = [];
 | 
						|
    private $readListeners = [];
 | 
						|
    private $writeStreams = [];
 | 
						|
    private $writeListeners = [];
 | 
						|
    private $running;
 | 
						|
 | 
						|
    public function __construct()
 | 
						|
    {
 | 
						|
        $this->nextTickQueue = new NextTickQueue($this);
 | 
						|
        $this->futureTickQueue = new FutureTickQueue($this);
 | 
						|
        $this->timers = new Timers();
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * {@inheritdoc}
 | 
						|
     */
 | 
						|
    public function addReadStream($stream, callable $listener)
 | 
						|
    {
 | 
						|
        $key = (int) $stream;
 | 
						|
 | 
						|
        if (!isset($this->readStreams[$key])) {
 | 
						|
            $this->readStreams[$key] = $stream;
 | 
						|
            $this->readListeners[$key] = $listener;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * {@inheritdoc}
 | 
						|
     */
 | 
						|
    public function addWriteStream($stream, callable $listener)
 | 
						|
    {
 | 
						|
        $key = (int) $stream;
 | 
						|
 | 
						|
        if (!isset($this->writeStreams[$key])) {
 | 
						|
            $this->writeStreams[$key] = $stream;
 | 
						|
            $this->writeListeners[$key] = $listener;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * {@inheritdoc}
 | 
						|
     */
 | 
						|
    public function removeReadStream($stream)
 | 
						|
    {
 | 
						|
        $key = (int) $stream;
 | 
						|
 | 
						|
        unset(
 | 
						|
            $this->readStreams[$key],
 | 
						|
            $this->readListeners[$key]
 | 
						|
        );
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * {@inheritdoc}
 | 
						|
     */
 | 
						|
    public function removeWriteStream($stream)
 | 
						|
    {
 | 
						|
        $key = (int) $stream;
 | 
						|
 | 
						|
        unset(
 | 
						|
            $this->writeStreams[$key],
 | 
						|
            $this->writeListeners[$key]
 | 
						|
        );
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * {@inheritdoc}
 | 
						|
     */
 | 
						|
    public function removeStream($stream)
 | 
						|
    {
 | 
						|
        $this->removeReadStream($stream);
 | 
						|
        $this->removeWriteStream($stream);
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * {@inheritdoc}
 | 
						|
     */
 | 
						|
    public function addTimer($interval, callable $callback)
 | 
						|
    {
 | 
						|
        $timer = new Timer($this, $interval, $callback, false);
 | 
						|
 | 
						|
        $this->timers->add($timer);
 | 
						|
 | 
						|
        return $timer;
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * {@inheritdoc}
 | 
						|
     */
 | 
						|
    public function addPeriodicTimer($interval, callable $callback)
 | 
						|
    {
 | 
						|
        $timer = new Timer($this, $interval, $callback, true);
 | 
						|
 | 
						|
        $this->timers->add($timer);
 | 
						|
 | 
						|
        return $timer;
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * {@inheritdoc}
 | 
						|
     */
 | 
						|
    public function cancelTimer(TimerInterface $timer)
 | 
						|
    {
 | 
						|
        $this->timers->cancel($timer);
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * {@inheritdoc}
 | 
						|
     */
 | 
						|
    public function isTimerActive(TimerInterface $timer)
 | 
						|
    {
 | 
						|
        return $this->timers->contains($timer);
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * {@inheritdoc}
 | 
						|
     */
 | 
						|
    public function nextTick(callable $listener)
 | 
						|
    {
 | 
						|
        $this->nextTickQueue->add($listener);
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * {@inheritdoc}
 | 
						|
     */
 | 
						|
    public function futureTick(callable $listener)
 | 
						|
    {
 | 
						|
        $this->futureTickQueue->add($listener);
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * {@inheritdoc}
 | 
						|
     */
 | 
						|
    public function tick()
 | 
						|
    {
 | 
						|
        $this->nextTickQueue->tick();
 | 
						|
 | 
						|
        $this->futureTickQueue->tick();
 | 
						|
 | 
						|
        $this->timers->tick();
 | 
						|
 | 
						|
        $this->waitForStreamActivity(0);
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * {@inheritdoc}
 | 
						|
     */
 | 
						|
    public function run()
 | 
						|
    {
 | 
						|
        $this->running = true;
 | 
						|
 | 
						|
        while ($this->running) {
 | 
						|
            $this->nextTickQueue->tick();
 | 
						|
 | 
						|
            $this->futureTickQueue->tick();
 | 
						|
 | 
						|
            $this->timers->tick();
 | 
						|
 | 
						|
            // Next-tick or future-tick queues have pending callbacks ...
 | 
						|
            if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) {
 | 
						|
                $timeout = 0;
 | 
						|
 | 
						|
            // There is a pending timer, only block until it is due ...
 | 
						|
            } elseif ($scheduledAt = $this->timers->getFirst()) {
 | 
						|
                $timeout = $scheduledAt - $this->timers->getTime();
 | 
						|
                if ($timeout < 0) {
 | 
						|
                    $timeout = 0;
 | 
						|
                } else {
 | 
						|
                    $timeout *= self::MICROSECONDS_PER_SECOND;
 | 
						|
                }
 | 
						|
 | 
						|
            // The only possible event is stream activity, so wait forever ...
 | 
						|
            } elseif ($this->readStreams || $this->writeStreams) {
 | 
						|
                $timeout = null;
 | 
						|
 | 
						|
            // There's nothing left to do ...
 | 
						|
            } else {
 | 
						|
                break;
 | 
						|
            }
 | 
						|
 | 
						|
            $this->waitForStreamActivity($timeout);
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * {@inheritdoc}
 | 
						|
     */
 | 
						|
    public function stop()
 | 
						|
    {
 | 
						|
        $this->running = false;
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * Wait/check for stream activity, or until the next timer is due.
 | 
						|
     */
 | 
						|
    private function waitForStreamActivity($timeout)
 | 
						|
    {
 | 
						|
        $read  = $this->readStreams;
 | 
						|
        $write = $this->writeStreams;
 | 
						|
 | 
						|
        $this->streamSelect($read, $write, $timeout);
 | 
						|
 | 
						|
        foreach ($read as $stream) {
 | 
						|
            $key = (int) $stream;
 | 
						|
 | 
						|
            if (isset($this->readListeners[$key])) {
 | 
						|
                call_user_func($this->readListeners[$key], $stream, $this);
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
        foreach ($write as $stream) {
 | 
						|
            $key = (int) $stream;
 | 
						|
 | 
						|
            if (isset($this->writeListeners[$key])) {
 | 
						|
                call_user_func($this->writeListeners[$key], $stream, $this);
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * Emulate a stream_select() implementation that does not break when passed
 | 
						|
     * empty stream arrays.
 | 
						|
     *
 | 
						|
     * @param array        &$read   An array of read streams to select upon.
 | 
						|
     * @param array        &$write  An array of write streams to select upon.
 | 
						|
     * @param integer|null $timeout Activity timeout in microseconds, or null to wait forever.
 | 
						|
     *
 | 
						|
     * @return integer The total number of streams that are ready for read/write.
 | 
						|
     */
 | 
						|
    protected function streamSelect(array &$read, array &$write, $timeout)
 | 
						|
    {
 | 
						|
        if ($read || $write) {
 | 
						|
            $except = null;
 | 
						|
 | 
						|
            return stream_select($read, $write, $except, $timeout === null ? null : 0, $timeout);
 | 
						|
        }
 | 
						|
 | 
						|
        usleep($timeout);
 | 
						|
 | 
						|
        return 0;
 | 
						|
    }
 | 
						|
}
 |