263 lines
6.0 KiB
PHP
263 lines
6.0 KiB
PHP
<?php
|
|
|
|
namespace React\EventLoop;
|
|
|
|
use React\EventLoop\Tick\FutureTickQueue;
|
|
use React\EventLoop\Tick\NextTickQueue;
|
|
use React\EventLoop\Timer\Timer;
|
|
use React\EventLoop\Timer\TimerInterface;
|
|
use React\EventLoop\Timer\Timers;
|
|
|
|
/**
|
|
* A stream_select() based React.
|
|
*/
|
|
class StreamSelectLoop implements LoopInterface
|
|
{
|
|
const MICROSECONDS_PER_SECOND = 1000000;
|
|
|
|
private $nextTickQueue;
|
|
private $futureTickQueue;
|
|
private $timers;
|
|
private $readStreams = [];
|
|
private $readListeners = [];
|
|
private $writeStreams = [];
|
|
private $writeListeners = [];
|
|
private $running;
|
|
|
|
public function __construct()
|
|
{
|
|
$this->nextTickQueue = new NextTickQueue($this);
|
|
$this->futureTickQueue = new FutureTickQueue($this);
|
|
$this->timers = new Timers();
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function addReadStream($stream, callable $listener)
|
|
{
|
|
$key = (int) $stream;
|
|
|
|
if (!isset($this->readStreams[$key])) {
|
|
$this->readStreams[$key] = $stream;
|
|
$this->readListeners[$key] = $listener;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function addWriteStream($stream, callable $listener)
|
|
{
|
|
$key = (int) $stream;
|
|
|
|
if (!isset($this->writeStreams[$key])) {
|
|
$this->writeStreams[$key] = $stream;
|
|
$this->writeListeners[$key] = $listener;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function removeReadStream($stream)
|
|
{
|
|
$key = (int) $stream;
|
|
|
|
unset(
|
|
$this->readStreams[$key],
|
|
$this->readListeners[$key]
|
|
);
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function removeWriteStream($stream)
|
|
{
|
|
$key = (int) $stream;
|
|
|
|
unset(
|
|
$this->writeStreams[$key],
|
|
$this->writeListeners[$key]
|
|
);
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function removeStream($stream)
|
|
{
|
|
$this->removeReadStream($stream);
|
|
$this->removeWriteStream($stream);
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function addTimer($interval, callable $callback)
|
|
{
|
|
$timer = new Timer($this, $interval, $callback, false);
|
|
|
|
$this->timers->add($timer);
|
|
|
|
return $timer;
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function addPeriodicTimer($interval, callable $callback)
|
|
{
|
|
$timer = new Timer($this, $interval, $callback, true);
|
|
|
|
$this->timers->add($timer);
|
|
|
|
return $timer;
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function cancelTimer(TimerInterface $timer)
|
|
{
|
|
$this->timers->cancel($timer);
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function isTimerActive(TimerInterface $timer)
|
|
{
|
|
return $this->timers->contains($timer);
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function nextTick(callable $listener)
|
|
{
|
|
$this->nextTickQueue->add($listener);
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function futureTick(callable $listener)
|
|
{
|
|
$this->futureTickQueue->add($listener);
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function tick()
|
|
{
|
|
$this->nextTickQueue->tick();
|
|
|
|
$this->futureTickQueue->tick();
|
|
|
|
$this->timers->tick();
|
|
|
|
$this->waitForStreamActivity(0);
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function run()
|
|
{
|
|
$this->running = true;
|
|
|
|
while ($this->running) {
|
|
$this->nextTickQueue->tick();
|
|
|
|
$this->futureTickQueue->tick();
|
|
|
|
$this->timers->tick();
|
|
|
|
// Next-tick or future-tick queues have pending callbacks ...
|
|
if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) {
|
|
$timeout = 0;
|
|
|
|
// There is a pending timer, only block until it is due ...
|
|
} elseif ($scheduledAt = $this->timers->getFirst()) {
|
|
$timeout = $scheduledAt - $this->timers->getTime();
|
|
if ($timeout < 0) {
|
|
$timeout = 0;
|
|
} else {
|
|
$timeout *= self::MICROSECONDS_PER_SECOND;
|
|
}
|
|
|
|
// The only possible event is stream activity, so wait forever ...
|
|
} elseif ($this->readStreams || $this->writeStreams) {
|
|
$timeout = null;
|
|
|
|
// There's nothing left to do ...
|
|
} else {
|
|
break;
|
|
}
|
|
|
|
$this->waitForStreamActivity($timeout);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function stop()
|
|
{
|
|
$this->running = false;
|
|
}
|
|
|
|
/**
|
|
* Wait/check for stream activity, or until the next timer is due.
|
|
*/
|
|
private function waitForStreamActivity($timeout)
|
|
{
|
|
$read = $this->readStreams;
|
|
$write = $this->writeStreams;
|
|
|
|
$this->streamSelect($read, $write, $timeout);
|
|
|
|
foreach ($read as $stream) {
|
|
$key = (int) $stream;
|
|
|
|
if (isset($this->readListeners[$key])) {
|
|
call_user_func($this->readListeners[$key], $stream, $this);
|
|
}
|
|
}
|
|
|
|
foreach ($write as $stream) {
|
|
$key = (int) $stream;
|
|
|
|
if (isset($this->writeListeners[$key])) {
|
|
call_user_func($this->writeListeners[$key], $stream, $this);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Emulate a stream_select() implementation that does not break when passed
|
|
* empty stream arrays.
|
|
*
|
|
* @param array &$read An array of read streams to select upon.
|
|
* @param array &$write An array of write streams to select upon.
|
|
* @param integer|null $timeout Activity timeout in microseconds, or null to wait forever.
|
|
*
|
|
* @return integer The total number of streams that are ready for read/write.
|
|
*/
|
|
protected function streamSelect(array &$read, array &$write, $timeout)
|
|
{
|
|
if ($read || $write) {
|
|
$except = null;
|
|
|
|
return stream_select($read, $write, $except, $timeout === null ? null : 0, $timeout);
|
|
}
|
|
|
|
usleep($timeout);
|
|
|
|
return 0;
|
|
}
|
|
}
|